This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2a39af894f Spark: Handle concurrently dropped view during CREATE OR
REPLACE (#9623)
2a39af894f is described below
commit 2a39af894f4f00aa37922ef765cc2583517fa1d1
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Feb 7 08:32:43 2024 +0100
Spark: Handle concurrently dropped view during CREATE OR REPLACE (#9623)
---
.../datasources/v2/CreateV2ViewExec.scala | 73 +++++++++++++---------
.../org/apache/iceberg/spark/SparkCatalog.java | 4 +-
2 files changed, 46 insertions(+), 31 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
index 388d391a4c..d2db0f4992 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.iceberg.spark.SupportsReplaceView
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
@@ -59,45 +60,24 @@ case class CreateV2ViewExec(
// CREATE OR REPLACE VIEW
catalog match {
case c: SupportsReplaceView =>
- c.replaceView(
- ident,
- queryText,
- currentCatalog,
- currentNamespace,
- viewSchema,
- queryColumnNames.toArray,
- columnAliases.toArray,
- columnComments.map(c => c.orNull).toArray,
- newProperties.asJava)
+ try {
+ replaceView(c, currentCatalog, currentNamespace, newProperties)
+ } catch {
+ // view might have been concurrently dropped during replace
+ case _: NoSuchViewException =>
+ replaceView(c, currentCatalog, currentNamespace, newProperties)
+ }
case _ =>
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}
- catalog.createView(
- ident,
- queryText,
- currentCatalog,
- currentNamespace,
- viewSchema,
- queryColumnNames.toArray,
- columnAliases.toArray,
- columnComments.map(c => c.orNull).toArray,
- newProperties.asJava)
+ createView(currentCatalog, currentNamespace, newProperties)
}
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
- catalog.createView(
- ident,
- queryText,
- currentCatalog,
- currentNamespace,
- viewSchema,
- queryColumnNames.toArray,
- columnAliases.toArray,
- columnComments.map(c => c.orNull).toArray,
- newProperties.asJava)
+ createView(currentCatalog, currentNamespace, newProperties)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
@@ -106,6 +86,39 @@ case class CreateV2ViewExec(
Nil
}
+ private def replaceView(
+ supportsReplaceView: SupportsReplaceView,
+ currentCatalog: String,
+ currentNamespace: Array[String],
+ newProperties: Map[String, String]) = {
+ supportsReplaceView.replaceView(
+ ident,
+ queryText,
+ currentCatalog,
+ currentNamespace,
+ viewSchema,
+ queryColumnNames.toArray,
+ columnAliases.toArray,
+ columnComments.map(c => c.orNull).toArray,
+ newProperties.asJava)
+ }
+
+ private def createView(
+ currentCatalog: String,
+ currentNamespace: Array[String],
+ newProperties: Map[String, String]) = {
+ catalog.createView(
+ ident,
+ queryText,
+ currentCatalog,
+ currentNamespace,
+ viewSchema,
+ queryColumnNames.toArray,
+ columnAliases.toArray,
+ columnComments.map(c => c.orNull).toArray,
+ newProperties.asJava)
+ }
+
override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 0483926c4c..7357a4683b 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -619,7 +619,7 @@ public class SparkCatalog extends BaseCatalog
String[] columnAliases,
String[] columnComments,
Map<String, String> properties)
- throws NoSuchNamespaceException {
+ throws NoSuchNamespaceException, NoSuchViewException {
if (null != asViewCatalog) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
@@ -643,6 +643,8 @@ public class SparkCatalog extends BaseCatalog
return new SparkView(catalogName, view);
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
throw new NoSuchNamespaceException(currentNamespace);
+ } catch (org.apache.iceberg.exceptions.NoSuchViewException e) {
+ throw new NoSuchViewException(ident);
}
}