This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a60ee5d683 Spark: Support renaming views (#9343)
a60ee5d683 is described below

commit a60ee5d6833c648ddc7d3d0d7f52f6c705e48d63
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jan 16 11:42:25 2024 +0100

    Spark: Support renaming views (#9343)
---
 .../spark/sql/catalyst/analysis/ResolveViews.scala |  16 +++
 .../plans/logical/views/ResolvedV2View.scala       |  31 ++++
 .../v2/ExtendedDataSourceV2Strategy.scala          |  12 ++
 .../datasources/v2/RenameV2ViewExec.scala          |  45 ++++++
 .../apache/iceberg/spark/extensions/TestViews.java | 157 ++++++++++++++++++++-
 .../org/apache/iceberg/spark/SparkCatalog.java     |  14 +-
 6 files changed, 269 insertions(+), 6 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index a978b94f49..19522d939b 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.catalyst.trees.Origin
@@ -53,6 +54,11 @@ case class ResolveViews(spark: SparkSession) extends 
Rule[LogicalPlan] with Look
       loadView(catalog, ident)
         .map(createViewRelation(parts, _))
         .getOrElse(u)
+
+    case u@UnresolvedTableOrView(CatalogAndIdentifier(catalog, ident), _, _) =>
+      loadView(catalog, ident)
+        .map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
+        .getOrElse(u)
   }
 
   def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = 
catalog match {
@@ -143,4 +149,14 @@ case class ResolveViews(spark: SparkSession) extends 
Rule[LogicalPlan] with Look
   private def isBuiltinFunction(name: String): Boolean = {
     
spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name))
   }
+
+
+  implicit class ViewHelper(plugin: CatalogPlugin) {
+    def asViewCatalog: ViewCatalog = plugin match {
+      case viewCatalog: ViewCatalog =>
+        viewCatalog
+      case _ =>
+        throw QueryCompilationErrors.missingCatalogAbilityError(plugin, 
"views")
+    }
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala
new file mode 100644
index 0000000000..b9c05ff006
--- /dev/null
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.views
+
+import org.apache.spark.sql.catalyst.analysis.LeafNodeWithoutStats
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+
+case class ResolvedV2View(
+  catalog: ViewCatalog,
+  identifier: Identifier) extends LeafNodeWithoutStats {
+  override def output: Seq[Attribute] = Nil
+}
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 6302d8307a..05307d8bf3 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -22,6 +22,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.iceberg.spark.Spark3Util
 import org.apache.iceberg.spark.SparkCatalog
 import org.apache.iceberg.spark.SparkSessionCatalog
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
@@ -38,11 +39,14 @@ import 
org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.DropTag
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
+import org.apache.spark.sql.catalyst.plans.logical.RenameTable
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
 import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
 import 
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.connector.catalog.ViewCatalog
 import org.apache.spark.sql.execution.OrderAwareCoalesceExec
 import org.apache.spark.sql.execution.SparkPlan
 import scala.jdk.CollectionConverters._
@@ -90,6 +94,14 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) 
extends Strategy wi
     case OrderAwareCoalesce(numPartitions, coalescer, child) =>
       OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil
 
+    case RenameTable(ResolvedV2View(oldCatalog: ViewCatalog, oldIdent), 
newName, isView@true) =>
+      val newIdent = Spark3Util.catalogAndIdentifier(spark, 
newName.toList.asJava)
+      if (oldCatalog.name != newIdent.catalog().name()) {
+        throw new AnalysisException(
+          s"Cannot move view between catalogs: from=${oldCatalog.name} and 
to=${newIdent.catalog().name()}")
+      }
+      RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil
+
     case _ => Nil
   }
 
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala
new file mode 100644
index 0000000000..61d362044c
--- /dev/null
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+
+
+case class RenameV2ViewExec(
+  catalog: ViewCatalog,
+  oldIdent: Identifier,
+  newIdent: Identifier) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    catalog.renameView(oldIdent, newIdent)
+
+    Seq.empty
+  }
+
+
+  override def simpleString(maxFields: Int): String = {
+    s"RenameV2View ${oldIdent} to {newIdent}"
+  }
+}
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 94e86e5ee3..9b179a6d11 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -23,6 +23,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -590,10 +591,7 @@ public class TestViews extends SparkExtensionsTestBase {
   @Test
   public void fullFunctionIdentifierNotRewrittenLoadFailure() {
     String viewName = "fullFunctionIdentifierNotRewrittenLoadFailure";
-    String sql =
-        String.format(
-            "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 
'a' AS value",
-            catalogName);
+    String sql = "SELECT spark_catalog.system.bucket(100, 'a') AS 
bucket_result, 'a' AS value";
 
     // avoid namespace failures
     sql("USE spark_catalog");
@@ -635,6 +633,157 @@ public class TestViews extends SparkExtensionsTestBase {
     return Spark3Util.loadIcebergCatalog(spark, catalogName);
   }
 
+  @Test
+  public void renameView() throws NoSuchTableException {
+    insertRows(10);
+    String viewName = viewName("originalView");
+    String renamedView = viewName("renamedView");
+    String sql = String.format("SELECT id FROM %s", tableName);
+
+    ViewCatalog viewCatalog = viewCatalog();
+
+    viewCatalog
+        .buildView(TableIdentifier.of(NAMESPACE, viewName))
+        .withQuery("spark", sql)
+        .withDefaultNamespace(NAMESPACE)
+        .withDefaultCatalog(catalogName)
+        .withSchema(schema(sql))
+        .create();
+
+    sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
+
+    List<Object[]> expected =
+        IntStream.rangeClosed(1, 
10).mapToObj(this::row).collect(Collectors.toList());
+    assertThat(sql("SELECT * FROM %s", renamedView))
+        .hasSize(10)
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
+
+  @Test
+  public void renameViewHiddenByTempView() throws NoSuchTableException {
+    insertRows(10);
+    String viewName = viewName("originalView");
+    String renamedView = viewName("renamedView");
+    String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName);
+
+    ViewCatalog viewCatalog = viewCatalog();
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", 
viewName, tableName);
+
+    viewCatalog
+        .buildView(TableIdentifier.of(NAMESPACE, viewName))
+        .withQuery("spark", sql)
+        .withDefaultNamespace(NAMESPACE)
+        .withDefaultCatalog(catalogName)
+        .withSchema(schema(sql))
+        .create();
+
+    // renames the TEMP VIEW
+    sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
+    assertThat(sql("SELECT * FROM %s", renamedView))
+        .hasSize(5)
+        .containsExactlyInAnyOrderElementsOf(
+            IntStream.rangeClosed(1, 
5).mapToObj(this::row).collect(Collectors.toList()));
+
+    // original view still exists with its name
+    assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, 
viewName))).isTrue();
+    assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, 
renamedView))).isFalse();
+    assertThat(sql("SELECT * FROM %s", viewName))
+        .hasSize(5)
+        .containsExactlyInAnyOrderElementsOf(
+            IntStream.rangeClosed(6, 
10).mapToObj(this::row).collect(Collectors.toList()));
+
+    // will rename the Iceberg view
+    sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
+    assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, 
renamedView))).isTrue();
+  }
+
+  @Test
+  public void renameViewToDifferentTargetCatalog() {
+    String viewName = viewName("originalView");
+    String renamedView = viewName("renamedView");
+    String sql = String.format("SELECT id FROM %s", tableName);
+
+    ViewCatalog viewCatalog = viewCatalog();
+
+    viewCatalog
+        .buildView(TableIdentifier.of(NAMESPACE, viewName))
+        .withQuery("spark", sql)
+        .withDefaultNamespace(NAMESPACE)
+        .withDefaultCatalog(catalogName)
+        .withSchema(schema(sql))
+        .create();
+
+    assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s", 
viewName, renamedView))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Cannot move view between catalogs: from=spark_with_views and 
to=spark_catalog");
+  }
+
+  @Test
+  public void renameNonExistingView() {
+    assertThatThrownBy(() -> sql("ALTER VIEW non_existing RENAME TO target"))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The table or view `non_existing` cannot be 
found");
+  }
+
+  @Test
+  public void renameViewTargetAlreadyExistsAsView() {
+    String viewName = viewName("renameViewSource");
+    String target = viewName("renameViewTarget");
+    String sql = String.format("SELECT id FROM %s", tableName);
+
+    ViewCatalog viewCatalog = viewCatalog();
+
+    viewCatalog
+        .buildView(TableIdentifier.of(NAMESPACE, viewName))
+        .withQuery("spark", sql)
+        .withDefaultNamespace(NAMESPACE)
+        .withDefaultCatalog(catalogName)
+        .withSchema(schema(sql))
+        .create();
+
+    viewCatalog
+        .buildView(TableIdentifier.of(NAMESPACE, target))
+        .withQuery("spark", sql)
+        .withDefaultNamespace(NAMESPACE)
+        .withDefaultCatalog(catalogName)
+        .withSchema(schema(sql))
+        .create();
+
+    assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, 
target))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            String.format("Cannot create view default.%s because it already 
exists", target));
+  }
+
+  @Test
+  public void renameViewTargetAlreadyExistsAsTable() {
+    String viewName = viewName("renameViewSource");
+    String target = viewName("renameViewTarget");
+    String sql = String.format("SELECT id FROM %s", tableName);
+
+    ViewCatalog viewCatalog = viewCatalog();
+
+    viewCatalog
+        .buildView(TableIdentifier.of(NAMESPACE, viewName))
+        .withQuery("spark", sql)
+        .withDefaultNamespace(NAMESPACE)
+        .withDefaultCatalog(catalogName)
+        .withSchema(schema(sql))
+        .create();
+
+    sql("CREATE TABLE %s (id INT, data STRING)", target);
+    assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, 
target))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            String.format("Cannot create view default.%s because it already 
exists", target));
+  }
+
+  private String viewName(String viewName) {
+    return viewName + new Random().nextInt(1000000);
+  }
+
   private void insertRows(int numRows) throws NoSuchTableException {
     List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
     for (int i = 1; i <= numRows; i++) {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 01804e0108..82665194df 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -588,8 +588,18 @@ public class SparkCatalog extends BaseCatalog
   @Override
   public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
       throws NoSuchViewException, ViewAlreadyExistsException {
-    throw new UnsupportedOperationException(
-        "Renaming a view is not supported by catalog: " + catalogName);
+    if (null != asViewCatalog) {
+      try {
+        asViewCatalog.renameView(buildIdentifier(fromIdentifier), 
buildIdentifier(toIdentifier));
+      } catch (org.apache.iceberg.exceptions.NoSuchViewException e) {
+        throw new NoSuchViewException(fromIdentifier);
+      } catch (org.apache.iceberg.exceptions.AlreadyExistsException e) {
+        throw new ViewAlreadyExistsException(toIdentifier);
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          "Renaming a view is not supported by catalog: " + catalogName);
+    }
   }
 
   @Override

Reply via email to