This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new b0ba4a0  [SPARK-52320] Add 
`ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists` to `SparkConnectError`
b0ba4a0 is described below

commit b0ba4a0cd1c67ffc6b5f7b30560fcbd08ac3ae23
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Mon May 26 22:15:00 2025 -0700

    [SPARK-52320] Add `ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists` 
to `SparkConnectError`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `ColumnNotFound`, `InvalidViewName`, 
`TableOrViewAlreadyExists` to `SparkConnectError`.
    
    ### Why are the changes needed?
    
    To provide a user can catch these exceptions easily instead of matching 
`internalError` with string patterns.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but these are more specific exceptions than before.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #179 from dongjoon-hyun/SPARK-52320.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/DataFrame.swift               |  2 +
 Sources/SparkConnect/SparkConnectClient.swift      | 15 +++++-
 Sources/SparkConnect/SparkConnectError.swift       |  3 ++
 Tests/SparkConnectTests/CatalogTests.swift         | 12 ++---
 Tests/SparkConnectTests/DataFrameTests.swift       |  4 +-
 Tests/SparkConnectTests/DataFrameWriterTests.swift |  5 +-
 .../SparkConnectTests/DataFrameWriterV2Tests.swift |  2 +-
 Tests/SparkConnectTests/MergeIntoWriterTests.swift | 57 ++++++++++++++++------
 8 files changed, 72 insertions(+), 28 deletions(-)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index eda8bda..fa1eb48 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -309,6 +309,8 @@ public actor DataFrame: Sendable {
           throw SparkConnectError.SchemaNotFound
         case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
           throw SparkConnectError.TableOrViewNotFound
+        case let m where m.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"):
+          throw SparkConnectError.ColumnNotFound
         default:
           throw error
         }
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index 93889ce..c9fc5b0 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -127,7 +127,20 @@ public actor SparkConnectClient {
       ),
       interceptors: self.intercepters
     ) { client in
-      return try await f(client)
+      do {
+        return try await f(client)
+      } catch let error as RPCError where error.code == .internalError {
+        switch error.message {
+        case let m where m.contains("TABLE_OR_VIEW_ALREADY_EXISTS"):
+          throw SparkConnectError.TableOrViewAlreadyExists
+        case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
+          throw SparkConnectError.TableOrViewNotFound
+        case let m where m.contains("Invalid view name:"):
+          throw SparkConnectError.InvalidViewName
+        default:
+          throw error
+        }
+      }
     }
   }
 
diff --git a/Sources/SparkConnect/SparkConnectError.swift 
b/Sources/SparkConnect/SparkConnectError.swift
index c300075..f235859 100644
--- a/Sources/SparkConnect/SparkConnectError.swift
+++ b/Sources/SparkConnect/SparkConnectError.swift
@@ -20,10 +20,13 @@
 /// A enum for ``SparkConnect`` package errors
 public enum SparkConnectError: Error {
   case CatalogNotFound
+  case ColumnNotFound
   case InvalidArgument
   case InvalidSessionID
   case InvalidType
+  case InvalidViewName
   case SchemaNotFound
+  case TableOrViewAlreadyExists
   case TableOrViewNotFound
   case UnsupportedOperation
 }
diff --git a/Tests/SparkConnectTests/CatalogTests.swift 
b/Tests/SparkConnectTests/CatalogTests.swift
index 30f91b0..b63fd35 100644
--- a/Tests/SparkConnectTests/CatalogTests.swift
+++ b/Tests/SparkConnectTests/CatalogTests.swift
@@ -205,12 +205,12 @@ struct CatalogTests {
       try await spark.range(1).createTempView(viewName)
       #expect(try await spark.catalog.tableExists(viewName))
 
-      try await #require(throws: Error.self) {
+      try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
         try await spark.range(1).createTempView(viewName)
       }
     })
 
-    try await #require(throws: Error.self) {
+    try await #require(throws: SparkConnectError.InvalidViewName) {
       try await spark.range(1).createTempView("invalid view name")
     }
 
@@ -228,7 +228,7 @@ struct CatalogTests {
       try await spark.range(1).createOrReplaceTempView(viewName)
     })
 
-    try await #require(throws: Error.self) {
+    try await #require(throws: SparkConnectError.InvalidViewName) {
       try await spark.range(1).createOrReplaceTempView("invalid view name")
     }
 
@@ -244,13 +244,13 @@ struct CatalogTests {
       try await spark.range(1).createGlobalTempView(viewName)
       #expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
 
-      try await #require(throws: Error.self) {
+      try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
         try await spark.range(1).createGlobalTempView(viewName)
       }
     })
     #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == 
false)
 
-    try await #require(throws: Error.self) {
+    try await #require(throws: SparkConnectError.InvalidViewName) {
       try await spark.range(1).createGlobalTempView("invalid view name")
     }
 
@@ -269,7 +269,7 @@ struct CatalogTests {
     })
     #expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == 
false)
 
-    try await #require(throws: Error.self) {
+    try await #require(throws: SparkConnectError.InvalidViewName) {
       try await spark.range(1).createOrReplaceGlobalTempView("invalid view 
name")
     }
 
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index 15cc6d6..f5c6eeb 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -235,10 +235,10 @@ struct DataFrameTests {
   @Test
   func selectInvalidColumn() async throws {
     let spark = try await SparkSession.builder.getOrCreate()
-    try await #require(throws: Error.self) {
+    try await #require(throws: SparkConnectError.ColumnNotFound) {
       try await spark.range(1).select("invalid").schema
     }
-    try await #require(throws: Error.self) {
+    try await #require(throws: SparkConnectError.ColumnNotFound) {
       try await spark.range(1).select("id + 1").schema
     }
     await spark.stop()
diff --git a/Tests/SparkConnectTests/DataFrameWriterTests.swift 
b/Tests/SparkConnectTests/DataFrameWriterTests.swift
index 3ad1234..5228667 100644
--- a/Tests/SparkConnectTests/DataFrameWriterTests.swift
+++ b/Tests/SparkConnectTests/DataFrameWriterTests.swift
@@ -112,7 +112,7 @@ struct DataFrameWriterTests {
       try await spark.range(1).write.saveAsTable(tableName)
       #expect(try await spark.read.table(tableName).count() == 1)
 
-      try await #require(throws: Error.self) {
+      try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
         try await spark.range(1).write.saveAsTable(tableName)
       }
 
@@ -130,8 +130,7 @@ struct DataFrameWriterTests {
     let spark = try await SparkSession.builder.getOrCreate()
     let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
     try await SQLHelper.withTable(spark, tableName)({
-      // Table doesn't exist.
-      try await #require(throws: Error.self) {
+      try await #require(throws: SparkConnectError.TableOrViewNotFound) {
         try await spark.range(1).write.insertInto(tableName)
       }
 
diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift 
b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
index 938caa8..60f47c2 100644
--- a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
+++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
@@ -34,7 +34,7 @@ struct DataFrameWriterV2Tests {
       let write = try await spark.range(2).writeTo(tableName).using("orc")
       try await write.create()
       #expect(try await spark.table(tableName).count() == 2)
-      try await #require(throws: Error.self) {
+      try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
         try await write.create()
       }
     })
diff --git a/Tests/SparkConnectTests/MergeIntoWriterTests.swift 
b/Tests/SparkConnectTests/MergeIntoWriterTests.swift
index d7d693d..9881653 100644
--- a/Tests/SparkConnectTests/MergeIntoWriterTests.swift
+++ b/Tests/SparkConnectTests/MergeIntoWriterTests.swift
@@ -31,11 +31,20 @@ struct MergeIntoWriterTests {
     let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
     try await SQLHelper.withTable(spark, tableName)({
       let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
-      try await #require(throws: Error.self) {
-        try await mergeInto.whenMatched().delete().merge()
-      }
-      try await #require(throws: Error.self) {
-        try await mergeInto.whenMatched("true").delete().merge()
+      if await spark.version >= "4.0.0" {
+        try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+          try await mergeInto.whenMatched().delete().merge()
+        }
+        try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+          try await mergeInto.whenMatched("true").delete().merge()
+        }
+      } else {
+        try await #require(throws: Error.self) {
+          try await mergeInto.whenMatched().delete().merge()
+        }
+        try await #require(throws: Error.self) {
+          try await mergeInto.whenMatched("true").delete().merge()
+        }
       }
     })
     await spark.stop()
@@ -47,11 +56,20 @@ struct MergeIntoWriterTests {
     let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
     try await SQLHelper.withTable(spark, tableName)({
       let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
-      try await #require(throws: Error.self) {
-        try await mergeInto.whenNotMatched().insertAll().merge()
-      }
-      try await #require(throws: Error.self) {
-        try await mergeInto.whenNotMatched("true").insertAll().merge()
+      if await spark.version >= "4.0.0" {
+        try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+          try await mergeInto.whenNotMatched().insertAll().merge()
+        }
+        try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+          try await mergeInto.whenNotMatched("true").insertAll().merge()
+        }
+      } else {
+        try await #require(throws: Error.self) {
+          try await mergeInto.whenNotMatched().insertAll().merge()
+        }
+        try await #require(throws: Error.self) {
+          try await mergeInto.whenNotMatched("true").insertAll().merge()
+        }
       }
     })
     await spark.stop()
@@ -63,11 +81,20 @@ struct MergeIntoWriterTests {
     let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", 
with: "")
     try await SQLHelper.withTable(spark, tableName)({
       let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
-      try await #require(throws: Error.self) {
-        try await mergeInto.whenNotMatchedBySource().delete().merge()
-      }
-      try await #require(throws: Error.self) {
-        try await mergeInto.whenNotMatchedBySource("true").delete().merge()
+      if await spark.version >= "4.0.0" {
+        try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+          try await mergeInto.whenNotMatchedBySource().delete().merge()
+        }
+        try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+          try await mergeInto.whenNotMatchedBySource("true").delete().merge()
+        }
+      } else {
+        try await #require(throws: Error.self) {
+          try await mergeInto.whenNotMatchedBySource().delete().merge()
+        }
+        try await #require(throws: Error.self) {
+          try await mergeInto.whenNotMatchedBySource("true").delete().merge()
+        }
       }
     })
     await spark.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to