This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 91d6c98 [SPARK-51719] Support `table` for `SparkSession` and
`DataFrameReader`
91d6c98 is described below
commit 91d6c98a39498a752f5ada579b8c18c25b6bc2b5
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Apr 6 10:33:51 2025 +0900
[SPARK-51719] Support `table` for `SparkSession` and `DataFrameReader`
### What changes were proposed in this pull request?
This PR aims to support `table` API for `SparkSession` and
`DataFrameReader`.
### Why are the changes needed?
For feature parity.
### Does this PR introduce _any_ user-facing change?
No, this is a new addition to the unreleased version.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42 from dongjoon-hyun/SPARK-51719.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrameReader.swift | 27 ++++++++++++++++++++++
Sources/SparkConnect/SparkSession.swift | 14 +++++++++++
Sources/SparkConnect/TypeAliases.swift | 1 +
Tests/SparkConnectTests/DataFrameReaderTests.swift | 10 ++++++++
Tests/SparkConnectTests/SparkSessionTests.swift | 10 ++++++++
5 files changed, 62 insertions(+)
diff --git a/Sources/SparkConnect/DataFrameReader.swift
b/Sources/SparkConnect/DataFrameReader.swift
index 47f022f..cfa41f9 100644
--- a/Sources/SparkConnect/DataFrameReader.swift
+++ b/Sources/SparkConnect/DataFrameReader.swift
@@ -40,6 +40,33 @@ public actor DataFrameReader: Sendable {
self.sparkSession = sparkSession
}
+ /// Returns the specified table/view as a ``DataFrame``. If it's a table, it
must support batch
+ /// reading and the returned ``DataFrame`` is the batch scan query plan of
this table. If it's a
+ /// view, the returned ``DataFrame`` is simply the query plan of the view,
which can either be a
+ /// batch or streaming query plan.
+ ///
+ /// - Parameter tableName: a qualified or unqualified name that designates a
table or view. If a database is
+ /// specified, it identifies the table/view from the database. Otherwise, it
first attempts to
+ /// find a temporary view with the given name and then match the table/view
from the current
+ /// database. Note that, the global temporary view database is also valid
here.
+ /// - Returns: A ``DataFrame`` instance.
+ public func table(_ tableName: String) -> DataFrame {
+ var namedTable = NamedTable()
+ namedTable.unparsedIdentifier = tableName
+ namedTable.options = self.extraOptions.toStringDictionary()
+
+ var read = Read()
+ read.namedTable = namedTable
+
+ var relation = Relation()
+ relation.read = read
+
+ var plan = Plan()
+ plan.opType = .root(relation)
+
+ return DataFrame(spark: sparkSession, plan: plan)
+ }
+
/// Specifies the input data source format.
/// - Parameter source: A string.
/// - Returns: A `DataFrameReader`.
diff --git a/Sources/SparkConnect/SparkSession.swift
b/Sources/SparkConnect/SparkSession.swift
index 39b6bbe..2fa583c 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -125,6 +125,20 @@ public actor SparkSession {
}
}
+ /// Returns the specified table/view as a ``DataFrame``. If it's a table, it
must support batch
+ /// reading and the returned ``DataFrame`` is the batch scan query plan of
this table. If it's a
+ /// view, the returned ``DataFrame`` is simply the query plan of the view,
which can either be a
+ /// batch or streaming query plan.
+ ///
+ /// - Parameter tableName: a qualified or unqualified name that designates a
table or view. If a database is
+ /// specified, it identifies the table/view from the database. Otherwise, it
first attempts to
+ /// find a temporary view with the given name and then match the table/view
from the current
+ /// database. Note that, the global temporary view database is also valid
here.
+ /// - Returns: A ``DataFrame`` instance.
+ public func table(_ tableName: String) async throws -> DataFrame {
+ return await read.table(tableName)
+ }
+
/// Executes some code block and prints to stdout the time taken to execute
the block.
/// - Parameter f: A function to execute.
/// - Returns: The result of the executed code.
diff --git a/Sources/SparkConnect/TypeAliases.swift
b/Sources/SparkConnect/TypeAliases.swift
index 198df89..f186729 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -31,6 +31,7 @@ typealias Filter = Spark_Connect_Filter
typealias KeyValue = Spark_Connect_KeyValue
typealias Limit = Spark_Connect_Limit
typealias MapType = Spark_Connect_DataType.Map
+typealias NamedTable = Spark_Connect_Read.NamedTable
typealias OneOf_Analyze = AnalyzePlanRequest.OneOf_Analyze
typealias Plan = Spark_Connect_Plan
typealias Project = Spark_Connect_Project
diff --git a/Tests/SparkConnectTests/DataFrameReaderTests.swift
b/Tests/SparkConnectTests/DataFrameReaderTests.swift
index c159b8f..5c0979d 100644
--- a/Tests/SparkConnectTests/DataFrameReaderTests.swift
+++ b/Tests/SparkConnectTests/DataFrameReaderTests.swift
@@ -64,4 +64,14 @@ struct DataFrameReaderTests {
#expect(try await spark.read.parquet(path, path).count() == 4)
await spark.stop()
}
+
+ @Test
+ func table() async throws {
+ let tableName = UUID().uuidString.replacingOccurrences(of: "-", with: "")
+ let spark = try await SparkSession.builder.getOrCreate()
+ #expect(try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1),
(2)").count() == 0)
+ #expect(try await spark.read.table(tableName).count() == 2)
+ #expect(try await spark.sql("DROP TABLE \(tableName)").count() == 0)
+ await spark.stop()
+ }
}
diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift
b/Tests/SparkConnectTests/SparkSessionTests.swift
index f302349..cba57e4 100644
--- a/Tests/SparkConnectTests/SparkSessionTests.swift
+++ b/Tests/SparkConnectTests/SparkSessionTests.swift
@@ -75,6 +75,16 @@ struct SparkSessionTests {
await spark.stop()
}
+ @Test
+ func table() async throws {
+ let tableName = UUID().uuidString.replacingOccurrences(of: "-", with: "")
+ let spark = try await SparkSession.builder.getOrCreate()
+ #expect(try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1),
(2)").count() == 0)
+ #expect(try await spark.table(tableName).count() == 2)
+ #expect(try await spark.sql("DROP TABLE \(tableName)").count() == 0)
+ await spark.stop()
+ }
+
@Test
func time() async throws {
let spark = try await SparkSession.builder.getOrCreate()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]