This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 3f6e377a1e Spark: Support creating views via SQL (#9423)
3f6e377a1e is described below
commit 3f6e377a1e994714bfc41c75d9aec7bdb85c95a9
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Jan 26 16:44:01 2024 +0100
Spark: Support creating views via SQL (#9423)
---
.../extensions/IcebergSparkSessionExtensions.scala | 2 +
.../spark/sql/catalyst/analysis/CheckViews.scala | 62 +++++
.../spark/sql/catalyst/analysis/ResolveViews.scala | 28 ++-
.../catalyst/analysis/RewriteViewCommands.scala | 61 +++++
.../plans/logical/views/CreateIcebergView.scala | 44 ++++
.../datasources/v2/CreateV2ViewExec.scala | 98 ++++++++
.../v2/ExtendedDataSourceV2Strategy.scala | 16 ++
.../apache/iceberg/spark/extensions/TestViews.java | 263 +++++++++++++++++++++
.../org/apache/iceberg/spark/SparkCatalog.java | 31 +++
.../org/apache/iceberg/spark/source/SparkView.java | 7 +-
10 files changed, 609 insertions(+), 3 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
index ad9df3994f..3fca29c294 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
@@ -20,6 +20,7 @@
package org.apache.iceberg.spark.extensions
import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.catalyst.analysis.CheckViews
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
@@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
+ extensions.injectCheckRule(_ => CheckViews)
// optimizer extensions
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
new file mode 100644
index 0000000000..4a1736764d
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.SchemaUtils
+
+object CheckViews extends (LogicalPlan => Unit) {
+
+ override def apply(plan: LogicalPlan): Unit = {
+ plan foreach {
+ case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _,
query, columnAliases, _,
+ _, _, _, _, _, _) =>
+ verifyColumnCount(ident, columnAliases, query)
+ SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames,
SQLConf.get.resolver)
+
+ case _ => // OK
+ }
+ }
+
+ private def verifyColumnCount(ident: Identifier, columns: Seq[String],
query: LogicalPlan): Unit = {
+ if (columns.nonEmpty) {
+ if (columns.length > query.output.length) {
+ throw new AnalysisException(
+ errorClass =
"CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+ messageParameters = Map(
+ "viewName" -> ident.toString,
+ "viewColumns" -> columns.mkString(", "),
+ "dataColumns" -> query.output.map(c => c.name).mkString(", ")))
+ } else if (columns.length < query.output.length) {
+ throw new AnalysisException(
+ errorClass =
"CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+ messageParameters = Map(
+ "viewName" -> ident.toString,
+ "viewColumns" -> columns.mkString(", "),
+ "dataColumns" -> query.output.map(c => c.name).mkString(", ")))
+ }
+ }
+ }
+}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index a1b826569f..5616f6f70b 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
@@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.MetadataBuilder
case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with
LookupCatalog {
@@ -59,6 +61,30 @@ case class ResolveViews(spark: SparkSession) extends
Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)
+
+ case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query,
columnAliases, columnComments, _, _, _, _, _, _)
+ if query.resolved && !c.rewritten =>
+ val aliased = aliasColumns(query, columnAliases, columnComments)
+ c.copy(query = aliased, queryColumnNames = query.schema.fieldNames,
rewritten = true)
+ }
+
+ private def aliasColumns(
+ plan: LogicalPlan,
+ columnAliases: Seq[String],
+ columnComments: Seq[Option[String]]): LogicalPlan = {
+ if (columnAliases.isEmpty || columnAliases.length != plan.output.length) {
+ plan
+ } else {
+ val projectList = plan.output.zipWithIndex.map { case (attr, pos) =>
+ if (columnComments.apply(pos).isDefined) {
+ val meta = new MetadataBuilder().putString("comment",
columnComments.apply(pos).get).build()
+ Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
+ } else {
+ Alias(attr, columnAliases.apply(pos))()
+ }
+ }
+ Project(projectList, plan)
+ }
}
def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] =
catalog match {
@@ -151,7 +177,7 @@ case class ResolveViews(spark: SparkSession) extends
Rule[LogicalPlan] with Look
}
- implicit class ViewHelper(plugin: CatalogPlugin) {
+ implicit class IcebergViewHelper(plugin: CatalogPlugin) {
def asViewCatalog: ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
index 2b35db33c0..066ba59394 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
@@ -19,13 +19,19 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.View
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.ViewCatalog
@@ -40,6 +46,20 @@ case class RewriteViewCommands(spark: SparkSession) extends
Rule[LogicalPlan] wi
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)
+
+ case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment,
properties,
+ Some(queryText), query, allowExisting, replace) =>
+ val q = CTESubstitution.apply(query)
+ verifyTemporaryObjectsDontExist(resolved.identifier, q)
+ CreateIcebergView(child = resolved,
+ queryText = queryText,
+ query = q,
+ columnAliases = userSpecifiedColumns.map(_._1),
+ columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+ comment = comment,
+ properties = properties,
+ allowExisting = allowExisting,
+ replace = replace)
}
private def isTempView(nameParts: Seq[String]): Boolean = {
@@ -62,4 +82,45 @@ case class RewriteViewCommands(spark: SparkSession) extends
Rule[LogicalPlan] wi
None
}
}
+
+ /**
+ * Permanent views are not allowed to reference temp objects
+ */
+ private def verifyTemporaryObjectsDontExist(
+ name: Identifier,
+ child: LogicalPlan): Unit = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ val tempViews = collectTemporaryViews(child)
+ tempViews.foreach { nameParts =>
+ throw new AnalysisException(
+ errorClass = "INVALID_TEMP_OBJ_REFERENCE",
+ messageParameters = Map(
+ "obj" -> "VIEW",
+ "objName" -> name.name(),
+ "tempObj" -> "VIEW",
+ "tempObjName" -> nameParts.quoted))
+ }
+
+ // TODO: check for temp function names
+ }
+
+ /**
+ * Collect all temporary views and return the identifiers separately
+ */
+ private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
+ def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
+ child.flatMap {
+ case unresolved: UnresolvedRelation if
isTempView(unresolved.multipartIdentifier) =>
+ Seq(unresolved.multipartIdentifier)
+ case view: View if view.isTempView =>
Seq(view.desc.identifier.nameParts)
+ case plan => plan.expressions.flatMap(_.flatMap {
+ case e: SubqueryExpression => collectTempViews(e.plan)
+ case _ => Seq.empty
+ })
+ }.distinct
+ }
+
+ collectTempViews(child)
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
new file mode 100644
index 0000000000..9366d5efe1
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.views
+
+import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+case class CreateIcebergView(
+ child: LogicalPlan,
+ queryText: String,
+ query: LogicalPlan,
+ columnAliases: Seq[String],
+ columnComments: Seq[Option[String]],
+ queryColumnNames: Seq[String] = Seq.empty,
+ comment: Option[String],
+ properties: Map[String, String],
+ allowExisting: Boolean,
+ replace: Boolean,
+ rewritten: Boolean = false) extends BinaryCommand {
+ override def left: LogicalPlan = child
+
+ override def right: LogicalPlan = query
+
+ override protected def withNewChildrenInternal(
+ newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
+ copy(child = newLeft, query = newRight)
+}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
new file mode 100644
index 0000000000..892e1eb857
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.types.StructType
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+ catalog: ViewCatalog,
+ ident: Identifier,
+ queryText: String,
+ viewSchema: StructType,
+ columnAliases: Seq[String],
+ columnComments: Seq[Option[String]],
+ queryColumnNames: Seq[String],
+ comment: Option[String],
+ properties: Map[String, String],
+ allowExisting: Boolean,
+ replace: Boolean) extends LeafV2CommandExec {
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ val currentCatalogName =
session.sessionState.catalogManager.currentCatalog.name
+ val currentCatalog = if (!catalog.name().equals(currentCatalogName))
currentCatalogName else null
+ val currentNamespace = session.sessionState.catalogManager.currentNamespace
+
+ val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
+ val newProperties = properties ++
+ comment.map(ViewCatalog.PROP_COMMENT -> _) +
+ (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
+ ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)
+
+ if (replace) {
+ // CREATE OR REPLACE VIEW
+ if (catalog.viewExists(ident)) {
+ catalog.dropView(ident)
+ }
+ // FIXME: replaceView API doesn't exist in Spark 3.5
+ catalog.createView(
+ ident,
+ queryText,
+ currentCatalog,
+ currentNamespace,
+ viewSchema,
+ queryColumnNames.toArray,
+ columnAliases.toArray,
+ columnComments.map(c => c.orNull).toArray,
+ newProperties.asJava)
+ } else {
+ try {
+ // CREATE VIEW [IF NOT EXISTS]
+ catalog.createView(
+ ident,
+ queryText,
+ currentCatalog,
+ currentNamespace,
+ viewSchema,
+ queryColumnNames.toArray,
+ columnAliases.toArray,
+ columnComments.map(c => c.orNull).toArray,
+ newProperties.asJava)
+ } catch {
+ case _: ViewAlreadyExistsException if allowExisting => // Ignore
+ }
+ }
+
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"CreateV2ViewExec: ${ident}"
+ }
+}
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 44157fc382..0505fe4e30 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
@@ -107,6 +108,21 @@ case class ExtendedDataSourceV2Strategy(spark:
SparkSession) extends Strategy wi
case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident),
ifExists) =>
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil
+ case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog,
ident), queryText, query,
+ columnAliases, columnComments, queryColumnNames, comment, properties,
allowExisting, replace, _) =>
+ CreateV2ViewExec(
+ catalog = viewCatalog,
+ ident = ident,
+ queryText = queryText,
+ columnAliases = columnAliases,
+ columnComments = columnComments,
+ queryColumnNames = queryColumnNames,
+ viewSchema = query.schema,
+ comment = comment,
+ properties = properties,
+ allowExisting = allowExisting,
+ replace = replace) :: Nil
+
case _ => Nil
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 0eb8c96b3c..bf6509afee 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.view.View;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -886,6 +887,268 @@ public class TestViews extends SparkExtensionsTestBase {
return viewName + new Random().nextInt(1000000);
}
+ @Test
+ public void createViewIfNotExists() {
+ String viewName = "viewThatAlreadyExists";
+ sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName);
+
+ assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s",
viewName, tableName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ String.format(
+ "Cannot create view %s.%s because it already exists",
NAMESPACE, viewName));
+
+ // using IF NOT EXISTS should work
+ assertThatNoException()
+ .isThrownBy(
+ () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s",
viewName, tableName));
+ }
+
+ @Test
+ public void createViewWithInvalidSQL() {
+ assertThatThrownBy(() -> sql("CREATE VIEW simpleViewWithInvalidSQL AS
invalid SQL"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Syntax error");
+ }
+
+ @Test
+ public void createViewReferencingTempView() throws NoSuchTableException {
+ insertRows(10);
+ String tempView = "temporaryViewBeingReferencedInAnotherView";
+ String viewReferencingTempView = "viewReferencingTemporaryView";
+
+ sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5",
tempView, tableName);
+
+ // creating a view that references a TEMP VIEW shouldn't be possible
+ assertThatThrownBy(
+ () -> sql("CREATE VIEW %s AS SELECT id FROM %s",
viewReferencingTempView, tempView))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot create the persistent object")
+ .hasMessageContaining(viewReferencingTempView)
+ .hasMessageContaining("of the type VIEW because it references to the
temporary object")
+ .hasMessageContaining(tempView);
+ }
+
+ @Test
+ public void createViewReferencingGlobalTempView() throws
NoSuchTableException {
+ insertRows(10);
+ String globalTempView = "globalTemporaryViewBeingReferenced";
+ String viewReferencingTempView = "viewReferencingGlobalTemporaryView";
+
+ sql(
+ "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5",
+ globalTempView, tableName);
+
+ // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE VIEW %s AS SELECT id FROM global_temp.%s",
+ viewReferencingTempView, globalTempView))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot create the persistent object")
+ .hasMessageContaining(viewReferencingTempView)
+ .hasMessageContaining("of the type VIEW because it references to the
temporary object")
+ .hasMessageContaining(globalTempView);
+ }
+
+ @Test
+ public void createViewUsingNonExistingTable() {
+ assertThatThrownBy(
+ () -> sql("CREATE VIEW viewWithNonExistingTable AS SELECT id FROM
non_existing"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("The table or view `non_existing` cannot be
found");
+ }
+
+ @Test
+ public void createViewWithMismatchedColumnCounts() {
+ String viewName = "viewWithMismatchedColumnCounts";
+
+ assertThatThrownBy(
+ () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s",
viewName, tableName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(String.format("Cannot create view %s.%s",
NAMESPACE, viewName))
+ .hasMessageContaining("not enough data columns")
+ .hasMessageContaining("View columns: id, data")
+ .hasMessageContaining("Data columns: id");
+
+ assertThatThrownBy(
+ () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s",
viewName, tableName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(String.format("Cannot create view %s.%s",
NAMESPACE, viewName))
+ .hasMessageContaining("too many data columns")
+ .hasMessageContaining("View columns: id")
+ .hasMessageContaining("Data columns: id, data");
+ }
+
+ @Test
+ public void createViewWithColumnAliases() throws NoSuchTableException {
+ insertRows(6);
+ String viewName = "viewWithColumnAliases";
+
+ sql(
+ "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS
SELECT id, data FROM %s WHERE id <= 3",
+ viewName, tableName);
+
+ View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE,
viewName));
+ assertThat(view.properties()).containsEntry("queryColumnNames", "id,data");
+
+ assertThat(view.schema().columns()).hasSize(2);
+ Types.NestedField first = view.schema().columns().get(0);
+ assertThat(first.name()).isEqualTo("new_id");
+ assertThat(first.doc()).isEqualTo("ID");
+
+ Types.NestedField second = view.schema().columns().get(1);
+ assertThat(second.name()).isEqualTo("new_data");
+ assertThat(second.doc()).isEqualTo("DATA");
+
+ assertThat(sql("SELECT new_id FROM %s", viewName))
+ .hasSize(3)
+ .containsExactlyInAnyOrder(row(1), row(2), row(3));
+
+ sql("DROP VIEW %s", viewName);
+
+ sql(
+ "CREATE VIEW %s (new_data, new_id) AS SELECT data, id FROM %s WHERE id
<= 3",
+ viewName, tableName);
+
+ assertThat(sql("SELECT new_id FROM %s", viewName))
+ .hasSize(3)
+ .containsExactlyInAnyOrder(row(1), row(2), row(3));
+ }
+
+ @Test
+ public void createViewWithDuplicateColumnNames() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id)
AS SELECT id, id FROM %s WHERE id <= 3",
+ tableName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("The column `new_id` already exists");
+ }
+
+ @Test
+ public void createViewWithDuplicateQueryColumnNames() throws
NoSuchTableException {
+ insertRows(3);
+ String viewName = "viewWithDuplicateQueryColumnNames";
+ String sql = String.format("SELECT id, id FROM %s WHERE id <= 3",
tableName);
+
+ // not specifying column aliases in the view should fail
+ assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("The column `id` already exists");
+
+ sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql);
+
+ assertThat(sql("SELECT * FROM %s", viewName))
+ .hasSize(3)
+ .containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3));
+ }
+
+ @Test
+ public void createViewWithCTE() throws NoSuchTableException {
+ insertRows(10);
+ String viewName = "simpleViewWithCTE";
+ String sql =
+ String.format(
+ "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+ + "SELECT max, count(1) AS count FROM max_by_data GROUP BY
max",
+ tableName);
+
+ sql("CREATE VIEW %s AS %s", viewName, sql);
+
+ assertThat(sql("SELECT * FROM %s",
viewName)).hasSize(1).containsExactly(row(10, 1L));
+ }
+
+ @Test
+ public void createViewWithConflictingNamesForCTEAndTempView() throws
NoSuchTableException {
+ insertRows(10);
+ String viewName = "viewWithConflictingNamesForCTEAndTempView";
+ String cteName = "cteName";
+ String sql =
+ String.format(
+ "WITH %s AS (SELECT max(id) as max FROM %s) "
+ + "(SELECT max, count(1) AS count FROM %s GROUP BY max)",
+ cteName, tableName, cteName);
+
+ // create a CTE and a TEMP VIEW with the same name
+ sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName);
+ sql("CREATE VIEW %s AS %s", viewName, sql);
+
+ // CTE should take precedence over the TEMP VIEW when data is read
+ assertThat(sql("SELECT * FROM %s",
viewName)).hasSize(1).containsExactly(row(10, 1L));
+ }
+
+ @Test
+ public void createViewWithCTEReferencingTempView() {
+ String viewName = "viewWithCTEReferencingTempView";
+ String tempViewInCTE = "tempViewInCTE";
+ String sql =
+ String.format(
+ "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+ + "SELECT max, count(1) AS count FROM max_by_data GROUP BY
max",
+ tempViewInCTE);
+
+ sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5",
tempViewInCTE, tableName);
+
+ assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot create the persistent object")
+ .hasMessageContaining(viewName)
+ .hasMessageContaining("of the type VIEW because it references to the
temporary object")
+ .hasMessageContaining(tempViewInCTE);
+ }
+
+ @Test
+ public void createViewWithNonExistingQueryColumn() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE VIEW viewWithNonExistingQueryColumn AS SELECT
non_existing FROM %s WHERE id <= 3",
+ tableName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "A column or function parameter with name `non_existing` cannot be
resolved");
+ }
+
+ @Test
+ public void createViewWithSubqueryExpressionUsingTempView() {
+ String viewName = "viewWithSubqueryExpression";
+ String tempView = "simpleTempView";
+ String sql =
+ String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)",
tableName, tempView);
+
+ sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5",
tempView, tableName);
+
+ assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(String.format("Cannot create the persistent
object %s", viewName))
+ .hasMessageContaining(
+ String.format("because it references to the temporary object %s",
tempView));
+ }
+
+ @Test
+ public void createViewWithSubqueryExpressionUsingGlobalTempView() {
+ String viewName = "simpleViewWithSubqueryExpression";
+ String globalTempView = "simpleGlobalTempView";
+ String sql =
+ String.format(
+ "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)",
+ tableName, globalTempView);
+
+ sql(
+ "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5",
+ globalTempView, tableName);
+
+ assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(String.format("Cannot create the persistent
object %s", viewName))
+ .hasMessageContaining(
+ String.format(
+ "because it references to the temporary object
global_temp.%s", globalTempView));
+ }
+
private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index a62f6bb95a..37e7387d69 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
@@ -568,6 +569,36 @@ public class SparkCatalog extends BaseCatalog
String[] columnComments,
Map<String, String> properties)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
+ if (null != asViewCatalog) {
+ Schema icebergSchema = SparkSchemaUtil.convert(schema);
+
+ StringJoiner joiner = new StringJoiner(",");
+ Arrays.stream(queryColumnNames).forEach(joiner::add);
+
+ try {
+ Map<String, String> props =
+ ImmutableMap.<String, String>builder()
+ .putAll(Spark3Util.rebuildCreateProperties(properties))
+ .put("queryColumnNames", joiner.toString())
+ .build();
+ org.apache.iceberg.view.View view =
+ asViewCatalog
+ .buildView(buildIdentifier(ident))
+ .withDefaultCatalog(currentCatalog)
+ .withDefaultNamespace(Namespace.of(currentNamespace))
+ .withQuery("spark", sql)
+ .withSchema(icebergSchema)
+ .withLocation(properties.get("location"))
+ .withProperties(props)
+ .create();
+ return new SparkView(catalogName, view);
+ } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
+ throw new NoSuchNamespaceException(currentNamespace);
+ } catch (AlreadyExistsException e) {
+ throw new ViewAlreadyExistsException(ident);
+ }
+ }
+
throw new UnsupportedOperationException(
"Creating a view is not supported by catalog: " + catalogName);
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
index 424519623e..5391d75476 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
@@ -35,8 +35,9 @@ import org.apache.spark.sql.types.StructType;
public class SparkView implements org.apache.spark.sql.connector.catalog.View {
+ private static final String QUERY_COLUMN_NAMES = "queryColumnNames";
private static final Set<String> RESERVED_PROPERTIES =
- ImmutableSet.of("provider", "location", FORMAT_VERSION);
+ ImmutableSet.of("provider", "location", FORMAT_VERSION,
QUERY_COLUMN_NAMES);
private final View icebergView;
private final String catalogName;
@@ -86,7 +87,9 @@ public class SparkView implements
org.apache.spark.sql.connector.catalog.View {
@Override
public String[] queryColumnNames() {
- return new String[0];
+ return icebergView.properties().containsKey(QUERY_COLUMN_NAMES)
+ ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",")
+ : new String[0];
}
@Override