This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4579b7a1e9 Spark: Fail on recursive cycle in view (#9834)
4579b7a1e9 is described below

commit 4579b7a1e941780496547f313e6fa07e712b09c5
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Mar 27 07:58:18 2024 +0100

    Spark: Fail on recursive cycle in view (#9834)
---
 .../spark/sql/catalyst/analysis/CheckViews.scala   | 52 ++++++++++++-
 .../apache/iceberg/spark/extensions/TestViews.java | 87 ++++++++++++++++++++++
 2 files changed, 138 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
index 9a2dee997a..b8cd102029 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
@@ -20,8 +20,12 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.View
 import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.connector.catalog.ViewCatalog
@@ -30,12 +34,18 @@ import org.apache.spark.sql.util.SchemaUtils
 
 object CheckViews extends (LogicalPlan => Unit) {
 
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
   override def apply(plan: LogicalPlan): Unit = {
     plan foreach {
       case CreateIcebergView(resolvedIdent@ResolvedIdentifier(_: ViewCatalog, 
_), _, query, columnAliases, _,
-      _, _, _, _, _, _) =>
+      _, _, _, _, replace, _) =>
         verifyColumnCount(resolvedIdent, columnAliases, query)
         SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, 
SQLConf.get.resolver)
+        if (replace) {
+          val viewIdent: Seq[String] = resolvedIdent.catalog.name() +: 
resolvedIdent.identifier.asMultipartIdentifier
+          checkCyclicViewReference(viewIdent, query, Seq(viewIdent))
+        }
 
       case AlterViewAs(ResolvedV2View(_, _), _, _) =>
         throw new AnalysisException("ALTER VIEW <viewName> AS is not 
supported. Use CREATE OR REPLACE VIEW instead")
@@ -63,4 +73,44 @@ object CheckViews extends (LogicalPlan => Unit) {
       }
     }
   }
+
+  private def checkCyclicViewReference(
+    viewIdent: Seq[String],
+    plan: LogicalPlan,
+    cyclePath: Seq[Seq[String]]): Unit = {
+    plan match {
+      case sub@SubqueryAlias(_, Project(_, _)) =>
+        val currentViewIdent: Seq[String] = sub.identifier.qualifier :+ 
sub.identifier.name
+        checkIfRecursiveView(viewIdent, currentViewIdent, cyclePath, 
sub.children)
+      case v1View: View =>
+        val currentViewIdent: Seq[String] = v1View.desc.identifier.nameParts
+        checkIfRecursiveView(viewIdent, currentViewIdent, cyclePath, 
v1View.children)
+      case _ =>
+        plan.children.foreach(child => checkCyclicViewReference(viewIdent, 
child, cyclePath))
+    }
+
+    plan.expressions.flatMap(_.flatMap {
+      case e: SubqueryExpression =>
+        checkCyclicViewReference(viewIdent, e.plan, cyclePath)
+        None
+      case _ => None
+    })
+  }
+
+  private def checkIfRecursiveView(
+    viewIdent: Seq[String],
+    currentViewIdent: Seq[String],
+    cyclePath: Seq[Seq[String]],
+    children: Seq[LogicalPlan]
+  ): Unit = {
+    val newCyclePath = cyclePath :+ currentViewIdent
+    if (currentViewIdent == viewIdent) {
+      throw new AnalysisException(String.format("Recursive cycle in view 
detected: %s (cycle: %s)",
+        viewIdent.asIdentifier, newCyclePath.map(p => 
p.mkString(".")).mkString(" -> ")))
+    } else {
+      children.foreach { c =>
+        checkCyclicViewReference(viewIdent, c, newCyclePath)
+      }
+    }
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index bd611b936a..3cc1e32d00 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -1866,6 +1866,93 @@ public class TestViews extends ExtensionsTestBase {
         
.isEqualTo(ImmutableSQLViewRepresentation.builder().dialect("spark").sql(sql).build());
   }
 
+  @TestTemplate
+  public void createViewWithRecursiveCycle() {
+    String viewOne = viewName("viewOne");
+    String viewTwo = viewName("viewTwo");
+
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewOne, tableName);
+    // viewTwo points to viewOne
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewTwo, viewOne);
+
+    // viewOne points to viewTwo points to viewOne, creating a recursive cycle
+    String view1 = String.format("%s.%s.%s", catalogName, NAMESPACE, viewOne);
+    String view2 = String.format("%s.%s.%s", catalogName, NAMESPACE, viewTwo);
+    String cycle = String.format("%s -> %s -> %s", view1, view2, view1);
+    assertThatThrownBy(() -> sql("CREATE OR REPLACE VIEW %s AS SELECT * FROM 
%s", viewOne, view2))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            String.format("Recursive cycle in view detected: %s (cycle: %s)", 
view1, cycle));
+  }
+
+  @TestTemplate
+  public void createViewWithRecursiveCycleToV1View() {
+    String viewOne = viewName("view_one");
+    String viewTwo = viewName("view_two");
+
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewOne, tableName);
+    // viewTwo points to viewOne
+    sql("USE spark_catalog");
+    sql("CREATE VIEW %s AS SELECT * FROM %s.%s.%s", viewTwo, catalogName, 
NAMESPACE, viewOne);
+
+    sql("USE %s", catalogName);
+    // viewOne points to viewTwo points to viewOne, creating a recursive cycle
+    String view1 = String.format("%s.%s.%s", catalogName, NAMESPACE, viewOne);
+    String view2 = String.format("%s.%s.%s", "spark_catalog", NAMESPACE, 
viewTwo);
+    String cycle = String.format("%s -> %s -> %s", view1, view2, view1);
+    assertThatThrownBy(() -> sql("CREATE OR REPLACE VIEW %s AS SELECT * FROM 
%s", viewOne, view2))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            String.format("Recursive cycle in view detected: %s (cycle: %s)", 
view1, cycle));
+  }
+
+  @TestTemplate
+  public void createViewWithRecursiveCycleInCTE() {
+    String viewOne = viewName("viewOne");
+    String viewTwo = viewName("viewTwo");
+
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewOne, tableName);
+    // viewTwo points to viewOne
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewTwo, viewOne);
+
+    // CTE points to viewTwo
+    String sql =
+        String.format(
+            "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+                + "SELECT max, count(1) AS count FROM max_by_data GROUP BY 
max",
+            viewTwo);
+
+    // viewOne points to CTE, creating a recursive cycle
+    String view1 = String.format("%s.%s.%s", catalogName, NAMESPACE, viewOne);
+    String cycle = String.format("%s -> %s -> %s", view1, viewTwo, view1);
+    assertThatThrownBy(() -> sql("CREATE OR REPLACE VIEW %s AS %s", viewOne, 
sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            String.format("Recursive cycle in view detected: %s (cycle: %s)", 
view1, cycle));
+  }
+
+  @TestTemplate
+  public void createViewWithRecursiveCycleInSubqueryExpression() {
+    String viewOne = viewName("viewOne");
+    String viewTwo = viewName("viewTwo");
+
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewOne, tableName);
+    // viewTwo points to viewOne
+    sql("CREATE VIEW %s AS SELECT * FROM %s", viewTwo, viewOne);
+
+    // subquery expression points to viewTwo
+    String sql =
+        String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", 
tableName, viewTwo);
+
+    // viewOne points to subquery expression, creating a recursive cycle
+    String view1 = String.format("%s.%s.%s", catalogName, NAMESPACE, viewOne);
+    String cycle = String.format("%s -> %s -> %s", view1, viewTwo, view1);
+    assertThatThrownBy(() -> sql("CREATE OR REPLACE VIEW %s AS %s", viewOne, 
sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            String.format("Recursive cycle in view detected: %s (cycle: %s)", 
view1, cycle));
+  }
+
   private void insertRows(int numRows) throws NoSuchTableException {
     List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
     for (int i = 1; i <= numRows; i++) {

Reply via email to