This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new ca07010  [SPARK-51730] Add `Catalog` actor and support 
`catalog/database` APIs
ca07010 is described below

commit ca07010450487478468ec1a0d1f114a678273e17
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Apr 7 20:34:29 2025 +0900

    [SPARK-51730] Add `Catalog` actor and support `catalog/database` APIs
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `Catalog` actor and support `catalog/database` APIs. 
Other APIs (`table/function/column`) will be added independently as the second 
part.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a new addition to the unreleased version.
    
    ### How was this patch tested?
    
    Pass the CIs and manual test on MacOS environment.
    
    ```
    $ swift test --filter CatalogTests
    ...
    􀟈  Test databaseExists() started.
    􀟈  Test setCurrentCatalog() started.
    􀟈  Test setCurrentDatabase() started.
    􀟈  Test getDatabase() started.
    􀟈  Test listDatabases() started.
    􀟈  Test listCatalogs() started.
    􀟈  Test currentDatabase() started.
    􀟈  Test currentCatalog() started.
    􁁛  Test currentDatabase() passed after 0.061 seconds.
    􁁛  Test currentCatalog() passed after 0.061 seconds.
    􁁛  Test setCurrentCatalog() passed after 0.067 seconds.
    􁁛  Test setCurrentDatabase() passed after 0.071 seconds.
    􁁛  Test getDatabase() passed after 0.138 seconds.
    􁁛  Test listCatalogs() passed after 0.143 seconds.
    􁁛  Test databaseExists() passed after 0.158 seconds.
    􁁛  Test listDatabases() passed after 0.188 seconds.
    􁁛  Suite CatalogTests passed after 0.189 seconds.
    􁁛  Test run with 8 tests passed after 0.189 seconds.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44 from dongjoon-hyun/SPARK-51730.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/Catalog.swift         | 202 +++++++++++++++++++++++++++++
 Sources/SparkConnect/SparkSession.swift    |   7 +
 Sources/SparkConnect/TypeAliases.swift     |   1 +
 Tests/SparkConnectTests/CatalogTests.swift | 107 +++++++++++++++
 4 files changed, 317 insertions(+)

diff --git a/Sources/SparkConnect/Catalog.swift 
b/Sources/SparkConnect/Catalog.swift
new file mode 100644
index 0000000..ed37bf7
--- /dev/null
+++ b/Sources/SparkConnect/Catalog.swift
@@ -0,0 +1,202 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+public struct CatalogMetadata: Sendable, Equatable {
+  public var name: String
+  public var description: String? = nil
+}
+
+public struct Database: Sendable, Equatable {
+  public var name: String
+  public var catalog: String? = nil
+  public var description: String? = nil
+  public var locationUri: String
+}
+
+// TODO: Rename `SparkTable` to `Table` after removing Arrow and Flatbuffer
+// from `SparkConnect` module. Currently, `SparkTable` is used to avoid the 
name conflict.
+public struct SparkTable: Sendable, Equatable {
+  public var name: String
+  public var catalog: String?
+  public var namespace: [String]?
+  public var description: String?
+  public var tableType: String
+  public var isTemporary: Bool
+  public var database: String? {
+    get {
+      guard let namespace else {
+        return nil
+      }
+      if namespace.count == 1 {
+        return namespace[0]
+      } else {
+        return nil
+      }
+    }
+  }
+}
+
+public struct Column: Sendable, Equatable {
+  public var name: String
+  public var description: String?
+  public var dataType: String
+  public var nullable: Bool
+  public var isPartition: Bool
+  public var isBucket: Bool
+  public var isCluster: Bool
+}
+
+public struct Function: Sendable, Equatable {
+  public var name: String
+  public var catalog: String?
+  public var namespace: [String]?
+  public var description: String?
+  public var className: String
+  public var isTemporary: Bool
+}
+
+/// Interface through which the user may create, drop, alter or query 
underlying databases, tables, functions etc.
+/// To access this, use ``SparkSession.catalog``.
+public actor Catalog: Sendable {
+  var spark: SparkSession
+
+  init(spark: SparkSession) {
+    self.spark = spark
+  }
+
+  /// A helper method to create a `Spark_Connect_Catalog`-based plan.
+  /// - Parameter f: A lambda function to create `Spark_Connect_Catalog`.
+  /// - Returns: A ``DataFrame`` contains the result of the given catalog 
operation.
+  private func getDataFrame(_ f: () -> Spark_Connect_Catalog) -> DataFrame {
+    var relation = Relation()
+    relation.catalog = f()
+    var plan = Plan()
+    plan.opType = .root(relation)
+    return DataFrame(spark: spark, plan: plan)
+  }
+
+  /// Returns the current default catalog in this session.
+  /// - Returns: A catalog name.
+  public func currentCatalog() async throws -> String {
+    let df = getDataFrame({
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .currentCatalog(Spark_Connect_CurrentCatalog())
+      return catalog
+    })
+    return try await df.collect()[0][0]!
+  }
+
+  /// Sets the current default catalog in this session.
+  /// - Parameter catalogName: name of the catalog to set
+  public func setCurrentCatalog(_ catalogName: String) async throws {
+    let df = getDataFrame({
+      var setCurrentCatalog = Spark_Connect_SetCurrentCatalog()
+      setCurrentCatalog.catalogName = catalogName
+
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .setCurrentCatalog(setCurrentCatalog)
+      return catalog
+    })
+    _ = try await df.count()
+  }
+
+  /// Returns a list of catalogs in this session.
+  /// - Returns: A list of ``CatalogMetadata``.
+  public func listCatalogs(pattern: String? = nil) async throws -> 
[CatalogMetadata] {
+    let df = getDataFrame({
+      var listCatalogs = Spark_Connect_ListCatalogs()
+      if let pattern {
+        listCatalogs.pattern = pattern
+      }
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .listCatalogs(listCatalogs)
+      return catalog
+    })
+    return try await df.collect().map {
+      CatalogMetadata(name: $0[0]!, description: $0[1])
+    }
+  }
+
+  /// Returns the current default database in this session.
+  /// - Returns: The current default database name.
+  public func currentDatabase() async throws -> String {
+    let df = getDataFrame({
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .currentDatabase(Spark_Connect_CurrentDatabase())
+      return catalog
+    })
+    return try await df.collect()[0][0]!
+  }
+
+  /// Sets the current default database in this session.
+  /// - Parameter dbName: name of the catalog to set
+  public func setCurrentDatabase(_ dbName: String) async throws {
+    let df = getDataFrame({
+      var setCurrentDatabase = Spark_Connect_SetCurrentDatabase()
+      setCurrentDatabase.dbName = dbName
+
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .setCurrentDatabase(setCurrentDatabase)
+      return catalog
+    })
+    _ = try await df.count()
+  }
+
+  /// Returns a list of databases available across all sessions.
+  /// - Parameter pattern: The pattern that the database name needs to match.
+  /// - Returns: A list of ``Database``.
+  public func listDatabases(pattern: String? = nil) async throws -> [Database] 
{
+    let df = getDataFrame({
+      var listDatabases = Spark_Connect_ListDatabases()
+      if let pattern {
+        listDatabases.pattern = pattern
+      }
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .listDatabases(listDatabases)
+      return catalog
+    })
+    return try await df.collect().map {
+      Database(name: $0[0]!, catalog: $0[1], description: $0[2], locationUri: 
$0[3]!)
+    }
+  }
+
+  /// Get the database with the specified name.
+  /// - Parameter dbName: name of the database to get.
+  /// - Returns: The database found by the name.
+  public func getDatabase(_ dbName: String) async throws -> Database {
+    let df = getDataFrame({
+      var db = Spark_Connect_GetDatabase()
+      db.dbName = dbName
+      var catalog = Spark_Connect_Catalog()
+      catalog.catType = .getDatabase(db)
+      return catalog
+    })
+    return try await df.collect().map {
+      Database(name: $0[0]!, catalog: $0[1], description: $0[2], locationUri: 
$0[3]!)
+    }.first!
+  }
+
+  /// Check if the database with the specified name exists.
+  /// - Parameter dbName: name of the database to check existence
+  /// - Returns: Indicating whether the database exists.
+  public func databaseExists(_ dbName: String) async throws -> Bool {
+    return try await self.listDatabases(pattern: dbName).count > 0
+  }
+}
diff --git a/Sources/SparkConnect/SparkSession.swift 
b/Sources/SparkConnect/SparkSession.swift
index 2fa583c..3b07c27 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -84,6 +84,13 @@ public actor SparkSession {
     }
   }
 
+  /// Interface through which the user may create, drop, alter or query 
underlying databases, tables, functions etc.
+  public var catalog: Catalog {
+    get {
+      return Catalog(spark: self)
+    }
+  }
+
   /// Stop the current client.
   public func stop() async {
     await client.stop()
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index f186729..6a700d6 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -33,6 +33,7 @@ typealias Limit = Spark_Connect_Limit
 typealias MapType = Spark_Connect_DataType.Map
 typealias NamedTable = Spark_Connect_Read.NamedTable
 typealias OneOf_Analyze = AnalyzePlanRequest.OneOf_Analyze
+typealias OneOf_CatType = Spark_Connect_Catalog.OneOf_CatType
 typealias Plan = Spark_Connect_Plan
 typealias Project = Spark_Connect_Project
 typealias Range = Spark_Connect_Range
diff --git a/Tests/SparkConnectTests/CatalogTests.swift 
b/Tests/SparkConnectTests/CatalogTests.swift
new file mode 100644
index 0000000..f49f2db
--- /dev/null
+++ b/Tests/SparkConnectTests/CatalogTests.swift
@@ -0,0 +1,107 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import Testing
+
+@testable import SparkConnect
+
+/// A test suite for `Catalog`
+struct CatalogTests {
+#if !os(Linux)
+  @Test
+  func currentCatalog() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.catalog.currentCatalog() == "spark_catalog")
+    await spark.stop()
+  }
+
+  @Test
+  func setCurrentCatalog() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    try await spark.catalog.setCurrentCatalog("spark_catalog")
+    try await #require(throws: Error.self) {
+      try await spark.catalog.setCurrentCatalog("not_exist_catalog")
+    }
+    await spark.stop()
+  }
+
+  @Test
+  func listCatalogs() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.catalog.listCatalogs() == [CatalogMetadata(name: 
"spark_catalog")])
+    #expect(try await spark.catalog.listCatalogs(pattern: "*") == 
[CatalogMetadata(name: "spark_catalog")])
+    #expect(try await spark.catalog.listCatalogs(pattern: "non_exist").count 
== 0)
+    await spark.stop()
+  }
+
+  @Test
+  func currentDatabase() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.catalog.currentDatabase() == "default")
+    await spark.stop()
+  }
+
+  @Test
+  func setCurrentDatabase() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    try await spark.catalog.setCurrentDatabase("default")
+    try await #require(throws: Error.self) {
+      try await spark.catalog.setCurrentDatabase("not_exist_database")
+    }
+    await spark.stop()
+  }
+
+  @Test
+  func listDatabases() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let dbs = try await spark.catalog.listDatabases()
+    #expect(dbs.count == 1)
+    #expect(dbs[0].name == "default")
+    #expect(dbs[0].catalog == "spark_catalog")
+    #expect(dbs[0].description == "default database")
+    #expect(dbs[0].locationUri.hasSuffix("spark-warehouse"))
+    #expect(try await spark.catalog.listDatabases(pattern: "*") == dbs)
+    #expect(try await spark.catalog.listDatabases(pattern: "non_exist").count 
== 0)
+    await spark.stop()
+  }
+
+  @Test
+  func getDatabase() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let db = try await spark.catalog.getDatabase("default")
+    #expect(db.name == "default")
+    #expect(db.catalog == "spark_catalog")
+    #expect(db.description == "default database")
+    #expect(db.locationUri.hasSuffix("spark-warehouse"))
+    try await #require(throws: Error.self) {
+      try await spark.catalog.getDatabase("not_exist_database")
+    }
+    await spark.stop()
+  }
+
+  @Test
+  func databaseExists() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.catalog.databaseExists("default"))
+    #expect(try await spark.catalog.databaseExists("not_exist_database") == 
false)
+    await spark.stop()
+  }
+#endif
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to