This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0536ff3989 Spark 3.4: Rewrite identifier when using Subquery
expressions in View (#9594)
0536ff3989 is described below
commit 0536ff39896f70afb77d3670894369c2d177980a
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Jan 31 10:30:02 2024 +0100
Spark 3.4: Rewrite identifier when using Subquery expressions in View
(#9594)
---
.../spark/sql/catalyst/analysis/ResolveViews.scala | 6 +++
.../apache/iceberg/spark/extensions/TestViews.java | 53 ++++++++++++++++++++++
2 files changed, 59 insertions(+)
diff --git
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index 29ead971ac..b22fd50d3b 100644
---
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -22,6 +22,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.expressions.UpCast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -166,6 +167,11 @@ case class ResolveViews(spark: SparkSession) extends
Rule[LogicalPlan] with Look
u.copy(multipartIdentifier = catalogAndNamespace :+ table)
case u@UnresolvedRelation(parts, _, _) if !isCatalog(parts.head) =>
u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)
+ case other =>
+ other.transformExpressions {
+ case subquery: SubqueryExpression =>
+ subquery.withNewPlan(qualifyTableIdentifiers(subquery.plan,
catalogAndNamespace))
+ }
}
private def isCatalog(name: String): Boolean = {
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 3cdc99be45..41edf47cc1 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -877,6 +877,9 @@ public class TestViews extends SparkExtensionsTestBase {
v1SessionCatalog()
.tableExists(new
org.apache.spark.sql.catalyst.TableIdentifier(v1View)))
.isFalse();
+
+ sql("USE spark_catalog");
+ sql("DROP TABLE IF EXISTS %s", tableName);
}
private SessionCatalog v1SessionCatalog() {
@@ -1149,6 +1152,56 @@ public class TestViews extends SparkExtensionsTestBase {
"because it references to the temporary object
global_temp.%s", globalTempView));
}
+ @Test
+ public void createViewWithSubqueryExpressionInFilterThatIsRewritten()
+ throws NoSuchTableException {
+ insertRows(5);
+ String viewName = viewName("viewWithSubqueryExpression");
+ String sql =
+ String.format(
+ "SELECT id FROM %s WHERE id = (SELECT max(id) FROM %s)",
tableName, tableName);
+
+ sql("CREATE VIEW %s AS %s", viewName, sql);
+
+ assertThat(sql("SELECT * FROM %s",
viewName)).hasSize(1).containsExactly(row(5));
+
+ sql("USE spark_catalog");
+
+ assertThatThrownBy(() -> sql(sql))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(String.format("The table or view `%s` cannot be
found", tableName));
+
+ // the underlying SQL in the View should be rewritten to have catalog &
namespace
+ assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, "default", viewName))
+ .hasSize(1)
+ .containsExactly(row(5));
+ }
+
+ @Test
+ public void createViewWithSubqueryExpressionInQueryThatIsRewritten() throws
NoSuchTableException {
+ insertRows(3);
+ String viewName = viewName("viewWithSubqueryExpression");
+ String sql =
+ String.format("SELECT (SELECT max(id) FROM %s) max_id FROM %s",
tableName, tableName);
+
+ sql("CREATE VIEW %s AS %s", viewName, sql);
+
+ assertThat(sql("SELECT * FROM %s", viewName))
+ .hasSize(3)
+ .containsExactly(row(3), row(3), row(3));
+
+ sql("USE spark_catalog");
+
+ assertThatThrownBy(() -> sql(sql))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(String.format("The table or view `%s` cannot be
found", tableName));
+
+ // the underlying SQL in the View should be rewritten to have catalog &
namespace
+ assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, "default", viewName))
+ .hasSize(3)
+ .containsExactly(row(3), row(3), row(3));
+ }
+
private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {