This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a60ee5d683 Spark: Support renaming views (#9343)
a60ee5d683 is described below
commit a60ee5d6833c648ddc7d3d0d7f52f6c705e48d63
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jan 16 11:42:25 2024 +0100
Spark: Support renaming views (#9343)
---
.../spark/sql/catalyst/analysis/ResolveViews.scala | 16 +++
.../plans/logical/views/ResolvedV2View.scala | 31 ++++
.../v2/ExtendedDataSourceV2Strategy.scala | 12 ++
.../datasources/v2/RenameV2ViewExec.scala | 45 ++++++
.../apache/iceberg/spark/extensions/TestViews.java | 157 ++++++++++++++++++++-
.../org/apache/iceberg/spark/SparkCatalog.java | 14 +-
6 files changed, 269 insertions(+), 6 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index a978b94f49..19522d939b 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
@@ -53,6 +54,11 @@ case class ResolveViews(spark: SparkSession) extends
Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(createViewRelation(parts, _))
.getOrElse(u)
+
+ case u@UnresolvedTableOrView(CatalogAndIdentifier(catalog, ident), _, _) =>
+ loadView(catalog, ident)
+ .map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
+ .getOrElse(u)
}
def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] =
catalog match {
@@ -143,4 +149,14 @@ case class ResolveViews(spark: SparkSession) extends
Rule[LogicalPlan] with Look
private def isBuiltinFunction(name: String): Boolean = {
spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name))
}
+
+
+ implicit class ViewHelper(plugin: CatalogPlugin) {
+ def asViewCatalog: ViewCatalog = plugin match {
+ case viewCatalog: ViewCatalog =>
+ viewCatalog
+ case _ =>
+ throw QueryCompilationErrors.missingCatalogAbilityError(plugin,
"views")
+ }
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala
new file mode 100644
index 0000000000..b9c05ff006
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.views
+
+import org.apache.spark.sql.catalyst.analysis.LeafNodeWithoutStats
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+
+case class ResolvedV2View(
+ catalog: ViewCatalog,
+ identifier: Identifier) extends LeafNodeWithoutStats {
+ override def output: Seq[Attribute] = Nil
+}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 6302d8307a..05307d8bf3 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -22,6 +22,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.SparkSessionCatalog
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
@@ -38,11 +39,14 @@ import
org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
+import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.OrderAwareCoalesceExec
import org.apache.spark.sql.execution.SparkPlan
import scala.jdk.CollectionConverters._
@@ -90,6 +94,14 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession)
extends Strategy wi
case OrderAwareCoalesce(numPartitions, coalescer, child) =>
OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil
+ case RenameTable(ResolvedV2View(oldCatalog: ViewCatalog, oldIdent),
newName, isView@true) =>
+ val newIdent = Spark3Util.catalogAndIdentifier(spark,
newName.toList.asJava)
+ if (oldCatalog.name != newIdent.catalog().name()) {
+ throw new AnalysisException(
+ s"Cannot move view between catalogs: from=${oldCatalog.name} and
to=${newIdent.catalog().name()}")
+ }
+ RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil
+
case _ => Nil
}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala
new file mode 100644
index 0000000000..61d362044c
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+
+
+case class RenameV2ViewExec(
+ catalog: ViewCatalog,
+ oldIdent: Identifier,
+ newIdent: Identifier) extends LeafV2CommandExec {
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ catalog.renameView(oldIdent, newIdent)
+
+ Seq.empty
+ }
+
+
+ override def simpleString(maxFields: Int): String = {
+ s"RenameV2View ${oldIdent} to {newIdent}"
+ }
+}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 94e86e5ee3..9b179a6d11 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -23,6 +23,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -590,10 +591,7 @@ public class TestViews extends SparkExtensionsTestBase {
@Test
public void fullFunctionIdentifierNotRewrittenLoadFailure() {
String viewName = "fullFunctionIdentifierNotRewrittenLoadFailure";
- String sql =
- String.format(
- "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result,
'a' AS value",
- catalogName);
+ String sql = "SELECT spark_catalog.system.bucket(100, 'a') AS
bucket_result, 'a' AS value";
// avoid namespace failures
sql("USE spark_catalog");
@@ -635,6 +633,157 @@ public class TestViews extends SparkExtensionsTestBase {
return Spark3Util.loadIcebergCatalog(spark, catalogName);
}
+ @Test
+ public void renameView() throws NoSuchTableException {
+ insertRows(10);
+ String viewName = viewName("originalView");
+ String renamedView = viewName("renamedView");
+ String sql = String.format("SELECT id FROM %s", tableName);
+
+ ViewCatalog viewCatalog = viewCatalog();
+
+ viewCatalog
+ .buildView(TableIdentifier.of(NAMESPACE, viewName))
+ .withQuery("spark", sql)
+ .withDefaultNamespace(NAMESPACE)
+ .withDefaultCatalog(catalogName)
+ .withSchema(schema(sql))
+ .create();
+
+ sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
+
+ List<Object[]> expected =
+ IntStream.rangeClosed(1,
10).mapToObj(this::row).collect(Collectors.toList());
+ assertThat(sql("SELECT * FROM %s", renamedView))
+ .hasSize(10)
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @Test
+ public void renameViewHiddenByTempView() throws NoSuchTableException {
+ insertRows(10);
+ String viewName = viewName("originalView");
+ String renamedView = viewName("renamedView");
+ String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName);
+
+ ViewCatalog viewCatalog = viewCatalog();
+
+ sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5",
viewName, tableName);
+
+ viewCatalog
+ .buildView(TableIdentifier.of(NAMESPACE, viewName))
+ .withQuery("spark", sql)
+ .withDefaultNamespace(NAMESPACE)
+ .withDefaultCatalog(catalogName)
+ .withSchema(schema(sql))
+ .create();
+
+ // renames the TEMP VIEW
+ sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
+ assertThat(sql("SELECT * FROM %s", renamedView))
+ .hasSize(5)
+ .containsExactlyInAnyOrderElementsOf(
+ IntStream.rangeClosed(1,
5).mapToObj(this::row).collect(Collectors.toList()));
+
+ // original view still exists with its name
+ assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE,
viewName))).isTrue();
+ assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE,
renamedView))).isFalse();
+ assertThat(sql("SELECT * FROM %s", viewName))
+ .hasSize(5)
+ .containsExactlyInAnyOrderElementsOf(
+ IntStream.rangeClosed(6,
10).mapToObj(this::row).collect(Collectors.toList()));
+
+ // will rename the Iceberg view
+ sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
+ assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE,
renamedView))).isTrue();
+ }
+
+ @Test
+ public void renameViewToDifferentTargetCatalog() {
+ String viewName = viewName("originalView");
+ String renamedView = viewName("renamedView");
+ String sql = String.format("SELECT id FROM %s", tableName);
+
+ ViewCatalog viewCatalog = viewCatalog();
+
+ viewCatalog
+ .buildView(TableIdentifier.of(NAMESPACE, viewName))
+ .withQuery("spark", sql)
+ .withDefaultNamespace(NAMESPACE)
+ .withDefaultCatalog(catalogName)
+ .withSchema(schema(sql))
+ .create();
+
+ assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s",
viewName, renamedView))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Cannot move view between catalogs: from=spark_with_views and
to=spark_catalog");
+ }
+
+ @Test
+ public void renameNonExistingView() {
+ assertThatThrownBy(() -> sql("ALTER VIEW non_existing RENAME TO target"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("The table or view `non_existing` cannot be
found");
+ }
+
+ @Test
+ public void renameViewTargetAlreadyExistsAsView() {
+ String viewName = viewName("renameViewSource");
+ String target = viewName("renameViewTarget");
+ String sql = String.format("SELECT id FROM %s", tableName);
+
+ ViewCatalog viewCatalog = viewCatalog();
+
+ viewCatalog
+ .buildView(TableIdentifier.of(NAMESPACE, viewName))
+ .withQuery("spark", sql)
+ .withDefaultNamespace(NAMESPACE)
+ .withDefaultCatalog(catalogName)
+ .withSchema(schema(sql))
+ .create();
+
+ viewCatalog
+ .buildView(TableIdentifier.of(NAMESPACE, target))
+ .withQuery("spark", sql)
+ .withDefaultNamespace(NAMESPACE)
+ .withDefaultCatalog(catalogName)
+ .withSchema(schema(sql))
+ .create();
+
+ assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName,
target))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ String.format("Cannot create view default.%s because it already
exists", target));
+ }
+
+ @Test
+ public void renameViewTargetAlreadyExistsAsTable() {
+ String viewName = viewName("renameViewSource");
+ String target = viewName("renameViewTarget");
+ String sql = String.format("SELECT id FROM %s", tableName);
+
+ ViewCatalog viewCatalog = viewCatalog();
+
+ viewCatalog
+ .buildView(TableIdentifier.of(NAMESPACE, viewName))
+ .withQuery("spark", sql)
+ .withDefaultNamespace(NAMESPACE)
+ .withDefaultCatalog(catalogName)
+ .withSchema(schema(sql))
+ .create();
+
+ sql("CREATE TABLE %s (id INT, data STRING)", target);
+ assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName,
target))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ String.format("Cannot create view default.%s because it already
exists", target));
+ }
+
+ private String viewName(String viewName) {
+ return viewName + new Random().nextInt(1000000);
+ }
+
private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 01804e0108..82665194df 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -588,8 +588,18 @@ public class SparkCatalog extends BaseCatalog
@Override
public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
throws NoSuchViewException, ViewAlreadyExistsException {
- throw new UnsupportedOperationException(
- "Renaming a view is not supported by catalog: " + catalogName);
+ if (null != asViewCatalog) {
+ try {
+ asViewCatalog.renameView(buildIdentifier(fromIdentifier),
buildIdentifier(toIdentifier));
+ } catch (org.apache.iceberg.exceptions.NoSuchViewException e) {
+ throw new NoSuchViewException(fromIdentifier);
+ } catch (org.apache.iceberg.exceptions.AlreadyExistsException e) {
+ throw new ViewAlreadyExistsException(toIdentifier);
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Renaming a view is not supported by catalog: " + catalogName);
+ }
}
@Override