This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new b0ba4a0 [SPARK-52320] Add `ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists` to `SparkConnectError` b0ba4a0 is described below commit b0ba4a0cd1c67ffc6b5f7b30560fcbd08ac3ae23 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Mon May 26 22:15:00 2025 -0700 [SPARK-52320] Add `ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists` to `SparkConnectError` ### What changes were proposed in this pull request? This PR aims to add `ColumnNotFound`, `InvalidViewName`, `TableOrViewAlreadyExists` to `SparkConnectError`. ### Why are the changes needed? To provide a user can catch these exceptions easily instead of matching `internalError` with string patterns. ### Does this PR introduce _any_ user-facing change? Yes, but these are more specific exceptions than before. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #179 from dongjoon-hyun/SPARK-52320. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrame.swift | 2 + Sources/SparkConnect/SparkConnectClient.swift | 15 +++++- Sources/SparkConnect/SparkConnectError.swift | 3 ++ Tests/SparkConnectTests/CatalogTests.swift | 12 ++--- Tests/SparkConnectTests/DataFrameTests.swift | 4 +- Tests/SparkConnectTests/DataFrameWriterTests.swift | 5 +- .../SparkConnectTests/DataFrameWriterV2Tests.swift | 2 +- Tests/SparkConnectTests/MergeIntoWriterTests.swift | 57 ++++++++++++++++------ 8 files changed, 72 insertions(+), 28 deletions(-) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index eda8bda..fa1eb48 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -309,6 +309,8 @@ public actor DataFrame: Sendable { throw SparkConnectError.SchemaNotFound case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"): throw SparkConnectError.TableOrViewNotFound + case let m where m.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"): + throw SparkConnectError.ColumnNotFound default: throw error } diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 93889ce..c9fc5b0 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -127,7 +127,20 @@ public actor SparkConnectClient { ), interceptors: self.intercepters ) { client in - return try await f(client) + do { + return try await f(client) + } catch let error as RPCError where error.code == .internalError { + switch error.message { + case let m where m.contains("TABLE_OR_VIEW_ALREADY_EXISTS"): + throw SparkConnectError.TableOrViewAlreadyExists + case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"): + throw SparkConnectError.TableOrViewNotFound + case let m where m.contains("Invalid view name:"): + throw SparkConnectError.InvalidViewName + default: + throw error + } + } } } diff --git a/Sources/SparkConnect/SparkConnectError.swift b/Sources/SparkConnect/SparkConnectError.swift index c300075..f235859 100644 --- a/Sources/SparkConnect/SparkConnectError.swift +++ b/Sources/SparkConnect/SparkConnectError.swift @@ -20,10 +20,13 @@ /// A enum for ``SparkConnect`` package errors public enum SparkConnectError: Error { case CatalogNotFound + case ColumnNotFound case InvalidArgument case InvalidSessionID case InvalidType + case InvalidViewName case SchemaNotFound + case TableOrViewAlreadyExists case TableOrViewNotFound case UnsupportedOperation } diff --git a/Tests/SparkConnectTests/CatalogTests.swift b/Tests/SparkConnectTests/CatalogTests.swift index 30f91b0..b63fd35 100644 --- a/Tests/SparkConnectTests/CatalogTests.swift +++ b/Tests/SparkConnectTests/CatalogTests.swift @@ -205,12 +205,12 @@ struct CatalogTests { try await spark.range(1).createTempView(viewName) #expect(try await spark.catalog.tableExists(viewName)) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await spark.range(1).createTempView(viewName) } }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createTempView("invalid view name") } @@ -228,7 +228,7 @@ struct CatalogTests { try await spark.range(1).createOrReplaceTempView(viewName) }) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createOrReplaceTempView("invalid view name") } @@ -244,13 +244,13 @@ struct CatalogTests { try await spark.range(1).createGlobalTempView(viewName) #expect(try await spark.catalog.tableExists("global_temp.\(viewName)")) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await spark.range(1).createGlobalTempView(viewName) } }) #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createGlobalTempView("invalid view name") } @@ -269,7 +269,7 @@ struct CatalogTests { }) #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.InvalidViewName) { try await spark.range(1).createOrReplaceGlobalTempView("invalid view name") } diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 15cc6d6..f5c6eeb 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -235,10 +235,10 @@ struct DataFrameTests { @Test func selectInvalidColumn() async throws { let spark = try await SparkSession.builder.getOrCreate() - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.ColumnNotFound) { try await spark.range(1).select("invalid").schema } - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.ColumnNotFound) { try await spark.range(1).select("id + 1").schema } await spark.stop() diff --git a/Tests/SparkConnectTests/DataFrameWriterTests.swift b/Tests/SparkConnectTests/DataFrameWriterTests.swift index 3ad1234..5228667 100644 --- a/Tests/SparkConnectTests/DataFrameWriterTests.swift +++ b/Tests/SparkConnectTests/DataFrameWriterTests.swift @@ -112,7 +112,7 @@ struct DataFrameWriterTests { try await spark.range(1).write.saveAsTable(tableName) #expect(try await spark.read.table(tableName).count() == 1) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await spark.range(1).write.saveAsTable(tableName) } @@ -130,8 +130,7 @@ struct DataFrameWriterTests { let spark = try await SparkSession.builder.getOrCreate() let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ - // Table doesn't exist. - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { try await spark.range(1).write.insertInto(tableName) } diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift index 938caa8..60f47c2 100644 --- a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift +++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift @@ -34,7 +34,7 @@ struct DataFrameWriterV2Tests { let write = try await spark.range(2).writeTo(tableName).using("orc") try await write.create() #expect(try await spark.table(tableName).count() == 2) - try await #require(throws: Error.self) { + try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) { try await write.create() } }) diff --git a/Tests/SparkConnectTests/MergeIntoWriterTests.swift b/Tests/SparkConnectTests/MergeIntoWriterTests.swift index d7d693d..9881653 100644 --- a/Tests/SparkConnectTests/MergeIntoWriterTests.swift +++ b/Tests/SparkConnectTests/MergeIntoWriterTests.swift @@ -31,11 +31,20 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: Error.self) { - try await mergeInto.whenMatched().delete().merge() - } - try await #require(throws: Error.self) { - try await mergeInto.whenMatched("true").delete().merge() + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenMatched().delete().merge() + } + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenMatched("true").delete().merge() + } + } else { + try await #require(throws: Error.self) { + try await mergeInto.whenMatched().delete().merge() + } + try await #require(throws: Error.self) { + try await mergeInto.whenMatched("true").delete().merge() + } } }) await spark.stop() @@ -47,11 +56,20 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: Error.self) { - try await mergeInto.whenNotMatched().insertAll().merge() - } - try await #require(throws: Error.self) { - try await mergeInto.whenNotMatched("true").insertAll().merge() + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatched().insertAll().merge() + } + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatched("true").insertAll().merge() + } + } else { + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatched().insertAll().merge() + } + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatched("true").insertAll().merge() + } } }) await spark.stop() @@ -63,11 +81,20 @@ struct MergeIntoWriterTests { let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") try await SQLHelper.withTable(spark, tableName)({ let mergeInto = try await spark.range(1).mergeInto(tableName, "true") - try await #require(throws: Error.self) { - try await mergeInto.whenNotMatchedBySource().delete().merge() - } - try await #require(throws: Error.self) { - try await mergeInto.whenNotMatchedBySource("true").delete().merge() + if await spark.version >= "4.0.0" { + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatchedBySource().delete().merge() + } + try await #require(throws: SparkConnectError.TableOrViewNotFound) { + try await mergeInto.whenNotMatchedBySource("true").delete().merge() + } + } else { + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatchedBySource().delete().merge() + } + try await #require(throws: Error.self) { + try await mergeInto.whenNotMatchedBySource("true").delete().merge() + } } }) await spark.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org