This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a39af894f Spark: Handle concurrently dropped view during CREATE OR 
REPLACE (#9623)
2a39af894f is described below

commit 2a39af894f4f00aa37922ef765cc2583517fa1d1
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Feb 7 08:32:43 2024 +0100

    Spark: Handle concurrently dropped view during CREATE OR REPLACE (#9623)
---
 .../datasources/v2/CreateV2ViewExec.scala          | 73 +++++++++++++---------
 .../org/apache/iceberg/spark/SparkCatalog.java     |  4 +-
 2 files changed, 46 insertions(+), 31 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
index 388d391a4c..d2db0f4992 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.iceberg.spark.SupportsReplaceView
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException
 import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -59,45 +60,24 @@ case class CreateV2ViewExec(
       // CREATE OR REPLACE VIEW
       catalog match {
         case c: SupportsReplaceView =>
-          c.replaceView(
-            ident,
-            queryText,
-            currentCatalog,
-            currentNamespace,
-            viewSchema,
-            queryColumnNames.toArray,
-            columnAliases.toArray,
-            columnComments.map(c => c.orNull).toArray,
-            newProperties.asJava)
+          try {
+            replaceView(c, currentCatalog, currentNamespace, newProperties)
+          } catch {
+            // view might have been concurrently dropped during replace
+            case _: NoSuchViewException =>
+              replaceView(c, currentCatalog, currentNamespace, newProperties)
+          }
         case _ =>
           if (catalog.viewExists(ident)) {
             catalog.dropView(ident)
           }
 
-          catalog.createView(
-            ident,
-            queryText,
-            currentCatalog,
-            currentNamespace,
-            viewSchema,
-            queryColumnNames.toArray,
-            columnAliases.toArray,
-            columnComments.map(c => c.orNull).toArray,
-            newProperties.asJava)
+          createView(currentCatalog, currentNamespace, newProperties)
       }
     } else {
       try {
         // CREATE VIEW [IF NOT EXISTS]
-        catalog.createView(
-          ident,
-          queryText,
-          currentCatalog,
-          currentNamespace,
-          viewSchema,
-          queryColumnNames.toArray,
-          columnAliases.toArray,
-          columnComments.map(c => c.orNull).toArray,
-          newProperties.asJava)
+        createView(currentCatalog, currentNamespace, newProperties)
       } catch {
         case _: ViewAlreadyExistsException if allowExisting => // Ignore
       }
@@ -106,6 +86,39 @@ case class CreateV2ViewExec(
     Nil
   }
 
+  private def replaceView(
+    supportsReplaceView: SupportsReplaceView,
+    currentCatalog: String,
+    currentNamespace: Array[String],
+    newProperties: Map[String, String]) = {
+    supportsReplaceView.replaceView(
+      ident,
+      queryText,
+      currentCatalog,
+      currentNamespace,
+      viewSchema,
+      queryColumnNames.toArray,
+      columnAliases.toArray,
+      columnComments.map(c => c.orNull).toArray,
+      newProperties.asJava)
+  }
+
+  private def createView(
+    currentCatalog: String,
+    currentNamespace: Array[String],
+    newProperties: Map[String, String]) = {
+    catalog.createView(
+      ident,
+      queryText,
+      currentCatalog,
+      currentNamespace,
+      viewSchema,
+      queryColumnNames.toArray,
+      columnAliases.toArray,
+      columnComments.map(c => c.orNull).toArray,
+      newProperties.asJava)
+  }
+
   override def simpleString(maxFields: Int): String = {
     s"CreateV2ViewExec: ${ident}"
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 0483926c4c..7357a4683b 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -619,7 +619,7 @@ public class SparkCatalog extends BaseCatalog
       String[] columnAliases,
       String[] columnComments,
       Map<String, String> properties)
-      throws NoSuchNamespaceException {
+      throws NoSuchNamespaceException, NoSuchViewException {
     if (null != asViewCatalog) {
       Schema icebergSchema = SparkSchemaUtil.convert(schema);
 
@@ -643,6 +643,8 @@ public class SparkCatalog extends BaseCatalog
         return new SparkView(catalogName, view);
       } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
         throw new NoSuchNamespaceException(currentNamespace);
+      } catch (org.apache.iceberg.exceptions.NoSuchViewException e) {
+        throw new NoSuchViewException(ident);
       }
     }
 

Reply via email to