This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f6e377a1e Spark: Support creating views via SQL (#9423)
3f6e377a1e is described below

commit 3f6e377a1e994714bfc41c75d9aec7bdb85c95a9
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Jan 26 16:44:01 2024 +0100

    Spark: Support creating views via SQL (#9423)
---
 .../extensions/IcebergSparkSessionExtensions.scala |   2 +
 .../spark/sql/catalyst/analysis/CheckViews.scala   |  62 +++++
 .../spark/sql/catalyst/analysis/ResolveViews.scala |  28 ++-
 .../catalyst/analysis/RewriteViewCommands.scala    |  61 +++++
 .../plans/logical/views/CreateIcebergView.scala    |  44 ++++
 .../datasources/v2/CreateV2ViewExec.scala          |  98 ++++++++
 .../v2/ExtendedDataSourceV2Strategy.scala          |  16 ++
 .../apache/iceberg/spark/extensions/TestViews.java | 263 +++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkCatalog.java     |  31 +++
 .../org/apache/iceberg/spark/source/SparkView.java |   7 +-
 10 files changed, 609 insertions(+), 3 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
index ad9df3994f..3fca29c294 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
@@ -20,6 +20,7 @@
 package org.apache.iceberg.spark.extensions
 
 import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.catalyst.analysis.CheckViews
 import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
 import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
 import org.apache.spark.sql.catalyst.analysis.ResolveViews
@@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
     extensions.injectResolutionRule { spark => ResolveViews(spark) }
     extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
+    extensions.injectCheckRule(_ => CheckViews)
 
     // optimizer extensions
     extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
new file mode 100644
index 0000000000..4a1736764d
--- /dev/null
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.SchemaUtils
+
+object CheckViews extends (LogicalPlan => Unit) {
+
+  override def apply(plan: LogicalPlan): Unit = {
+    plan foreach {
+      case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, 
query, columnAliases, _,
+      _, _, _, _, _, _) =>
+        verifyColumnCount(ident, columnAliases, query)
+        SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, 
SQLConf.get.resolver)
+
+      case _ => // OK
+    }
+  }
+
+  private def verifyColumnCount(ident: Identifier, columns: Seq[String], 
query: LogicalPlan): Unit = {
+    if (columns.nonEmpty) {
+      if (columns.length > query.output.length) {
+        throw new AnalysisException(
+          errorClass = 
"CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+          messageParameters = Map(
+            "viewName" -> ident.toString,
+            "viewColumns" -> columns.mkString(", "),
+            "dataColumns" -> query.output.map(c => c.name).mkString(", ")))
+      } else if (columns.length < query.output.length) {
+        throw new AnalysisException(
+          errorClass = 
"CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+          messageParameters = Map(
+            "viewName" -> ident.toString,
+            "viewColumns" -> columns.mkString(", "),
+            "dataColumns" -> query.output.map(c => c.name).mkString(", ")))
+      }
+    }
+  }
+}
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index a1b826569f..5616f6f70b 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
@@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
 import org.apache.spark.sql.connector.catalog.View
 import org.apache.spark.sql.connector.catalog.ViewCatalog
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.MetadataBuilder
 
 case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with 
LookupCatalog {
 
@@ -59,6 +61,30 @@ case class ResolveViews(spark: SparkSession) extends 
Rule[LogicalPlan] with Look
       loadView(catalog, ident)
         .map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
         .getOrElse(u)
+
+    case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, 
columnAliases, columnComments, _, _, _, _, _, _)
+      if query.resolved && !c.rewritten =>
+      val aliased = aliasColumns(query, columnAliases, columnComments)
+      c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, 
rewritten = true)
+  }
+
+  private def aliasColumns(
+    plan: LogicalPlan,
+    columnAliases: Seq[String],
+    columnComments: Seq[Option[String]]): LogicalPlan = {
+    if (columnAliases.isEmpty || columnAliases.length != plan.output.length) {
+      plan
+    } else {
+      val projectList = plan.output.zipWithIndex.map { case (attr, pos) =>
+        if (columnComments.apply(pos).isDefined) {
+          val meta = new MetadataBuilder().putString("comment", 
columnComments.apply(pos).get).build()
+          Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
+        } else {
+          Alias(attr, columnAliases.apply(pos))()
+        }
+      }
+      Project(projectList, plan)
+    }
   }
 
   def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = 
catalog match {
@@ -151,7 +177,7 @@ case class ResolveViews(spark: SparkSession) extends 
Rule[LogicalPlan] with Look
   }
 
 
-  implicit class ViewHelper(plugin: CatalogPlugin) {
+  implicit class IcebergViewHelper(plugin: CatalogPlugin) {
     def asViewCatalog: ViewCatalog = plugin match {
       case viewCatalog: ViewCatalog =>
         viewCatalog
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
index 2b35db33c0..066ba59394 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
@@ -19,13 +19,19 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.logical.CreateView
 import org.apache.spark.sql.catalyst.plans.logical.DropView
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.View
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.LookupCatalog
 import org.apache.spark.sql.connector.catalog.ViewCatalog
 
@@ -40,6 +46,20 @@ case class RewriteViewCommands(spark: SparkSession) extends 
Rule[LogicalPlan] wi
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
     case DropView(ResolvedView(resolved), ifExists) =>
       DropIcebergView(resolved, ifExists)
+
+    case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, 
properties,
+    Some(queryText), query, allowExisting, replace) =>
+      val q = CTESubstitution.apply(query)
+      verifyTemporaryObjectsDontExist(resolved.identifier, q)
+      CreateIcebergView(child = resolved,
+        queryText = queryText,
+        query = q,
+        columnAliases = userSpecifiedColumns.map(_._1),
+        columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+        comment = comment,
+        properties = properties,
+        allowExisting = allowExisting,
+        replace = replace)
   }
 
   private def isTempView(nameParts: Seq[String]): Boolean = {
@@ -62,4 +82,45 @@ case class RewriteViewCommands(spark: SparkSession) extends 
Rule[LogicalPlan] wi
         None
     }
   }
+
+  /**
+   * Permanent views are not allowed to reference temp objects
+   */
+  private def verifyTemporaryObjectsDontExist(
+    name: Identifier,
+    child: LogicalPlan): Unit = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+    val tempViews = collectTemporaryViews(child)
+    tempViews.foreach { nameParts =>
+      throw new AnalysisException(
+        errorClass = "INVALID_TEMP_OBJ_REFERENCE",
+        messageParameters = Map(
+          "obj" -> "VIEW",
+          "objName" -> name.name(),
+          "tempObj" -> "VIEW",
+          "tempObjName" -> nameParts.quoted))
+    }
+
+    // TODO: check for temp function names
+  }
+
+  /**
+   * Collect all temporary views and return the identifiers separately
+   */
+  private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
+    def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
+      child.flatMap {
+        case unresolved: UnresolvedRelation if 
isTempView(unresolved.multipartIdentifier) =>
+          Seq(unresolved.multipartIdentifier)
+        case view: View if view.isTempView => 
Seq(view.desc.identifier.nameParts)
+        case plan => plan.expressions.flatMap(_.flatMap {
+          case e: SubqueryExpression => collectTempViews(e.plan)
+          case _ => Seq.empty
+        })
+      }.distinct
+    }
+
+    collectTempViews(child)
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
new file mode 100644
index 0000000000..9366d5efe1
--- /dev/null
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.views
+
+import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+case class CreateIcebergView(
+  child: LogicalPlan,
+  queryText: String,
+  query: LogicalPlan,
+  columnAliases: Seq[String],
+  columnComments: Seq[Option[String]],
+  queryColumnNames: Seq[String] = Seq.empty,
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean,
+  rewritten: Boolean = false) extends BinaryCommand {
+  override def left: LogicalPlan = child
+
+  override def right: LogicalPlan = query
+
+  override protected def withNewChildrenInternal(
+    newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
+    copy(child = newLeft, query = newRight)
+}
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
new file mode 100644
index 0000000000..892e1eb857
--- /dev/null
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.ViewCatalog
+import org.apache.spark.sql.types.StructType
+import scala.collection.JavaConverters._
+
+
+case class CreateV2ViewExec(
+  catalog: ViewCatalog,
+  ident: Identifier,
+  queryText: String,
+  viewSchema: StructType,
+  columnAliases: Seq[String],
+  columnComments: Seq[Option[String]],
+  queryColumnNames: Seq[String],
+  comment: Option[String],
+  properties: Map[String, String],
+  allowExisting: Boolean,
+  replace: Boolean) extends LeafV2CommandExec {
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    val currentCatalogName = 
session.sessionState.catalogManager.currentCatalog.name
+    val currentCatalog = if (!catalog.name().equals(currentCatalogName)) 
currentCatalogName else null
+    val currentNamespace = session.sessionState.catalogManager.currentNamespace
+
+    val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
+    val newProperties = properties ++
+      comment.map(ViewCatalog.PROP_COMMENT -> _) +
+      (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
+        ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)
+
+    if (replace) {
+      // CREATE OR REPLACE VIEW
+      if (catalog.viewExists(ident)) {
+        catalog.dropView(ident)
+      }
+      // FIXME: replaceView API doesn't exist in Spark 3.5
+      catalog.createView(
+        ident,
+        queryText,
+        currentCatalog,
+        currentNamespace,
+        viewSchema,
+        queryColumnNames.toArray,
+        columnAliases.toArray,
+        columnComments.map(c => c.orNull).toArray,
+        newProperties.asJava)
+    } else {
+      try {
+        // CREATE VIEW [IF NOT EXISTS]
+        catalog.createView(
+          ident,
+          queryText,
+          currentCatalog,
+          currentNamespace,
+          viewSchema,
+          queryColumnNames.toArray,
+          columnAliases.toArray,
+          columnComments.map(c => c.orNull).toArray,
+          newProperties.asJava)
+      } catch {
+        case _: ViewAlreadyExistsException if allowExisting => // Ignore
+      }
+    }
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"CreateV2ViewExec: ${ident}"
+  }
+}
diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 44157fc382..0505fe4e30 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
 import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
 import 
org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
 import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -107,6 +108,21 @@ case class ExtendedDataSourceV2Strategy(spark: 
SparkSession) extends Strategy wi
     case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), 
ifExists) =>
       DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil
 
+    case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, 
ident), queryText, query,
+    columnAliases, columnComments, queryColumnNames, comment, properties, 
allowExisting, replace, _) =>
+      CreateV2ViewExec(
+        catalog = viewCatalog,
+        ident = ident,
+        queryText = queryText,
+        columnAliases = columnAliases,
+        columnComments = columnComments,
+        queryColumnNames = queryColumnNames,
+        viewSchema = query.schema,
+        comment = comment,
+        properties = properties,
+        allowExisting = allowExisting,
+        replace = replace) :: Nil
+
     case _ => Nil
   }
 
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 0eb8c96b3c..bf6509afee 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.view.View;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -886,6 +887,268 @@ public class TestViews extends SparkExtensionsTestBase {
     return viewName + new Random().nextInt(1000000);
   }
 
+  @Test
+  public void createViewIfNotExists() {
+    String viewName = "viewThatAlreadyExists";
+    sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", 
viewName, tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            String.format(
+                "Cannot create view %s.%s because it already exists", 
NAMESPACE, viewName));
+
+    // using IF NOT EXISTS should work
+    assertThatNoException()
+        .isThrownBy(
+            () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", 
viewName, tableName));
+  }
+
+  @Test
+  public void createViewWithInvalidSQL() {
+    assertThatThrownBy(() -> sql("CREATE VIEW simpleViewWithInvalidSQL AS 
invalid SQL"))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Syntax error");
+  }
+
+  @Test
+  public void createViewReferencingTempView() throws NoSuchTableException {
+    insertRows(10);
+    String tempView = "temporaryViewBeingReferencedInAnotherView";
+    String viewReferencingTempView = "viewReferencingTemporaryView";
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", 
tempView, tableName);
+
+    // creating a view that references a TEMP VIEW shouldn't be possible
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW %s AS SELECT id FROM %s", 
viewReferencingTempView, tempView))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot create the persistent object")
+        .hasMessageContaining(viewReferencingTempView)
+        .hasMessageContaining("of the type VIEW because it references to the 
temporary object")
+        .hasMessageContaining(tempView);
+  }
+
+  @Test
+  public void createViewReferencingGlobalTempView() throws 
NoSuchTableException {
+    insertRows(10);
+    String globalTempView = "globalTemporaryViewBeingReferenced";
+    String viewReferencingTempView = "viewReferencingGlobalTemporaryView";
+
+    sql(
+        "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5",
+        globalTempView, tableName);
+
+    // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE VIEW %s AS SELECT id FROM global_temp.%s",
+                    viewReferencingTempView, globalTempView))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot create the persistent object")
+        .hasMessageContaining(viewReferencingTempView)
+        .hasMessageContaining("of the type VIEW because it references to the 
temporary object")
+        .hasMessageContaining(globalTempView);
+  }
+
+  @Test
+  public void createViewUsingNonExistingTable() {
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW viewWithNonExistingTable AS SELECT id FROM 
non_existing"))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The table or view `non_existing` cannot be 
found");
+  }
+
+  @Test
+  public void createViewWithMismatchedColumnCounts() {
+    String viewName = "viewWithMismatchedColumnCounts";
+
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", 
viewName, tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create view %s.%s", 
NAMESPACE, viewName))
+        .hasMessageContaining("not enough data columns")
+        .hasMessageContaining("View columns: id, data")
+        .hasMessageContaining("Data columns: id");
+
+    assertThatThrownBy(
+            () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", 
viewName, tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create view %s.%s", 
NAMESPACE, viewName))
+        .hasMessageContaining("too many data columns")
+        .hasMessageContaining("View columns: id")
+        .hasMessageContaining("Data columns: id, data");
+  }
+
+  @Test
+  public void createViewWithColumnAliases() throws NoSuchTableException {
+    insertRows(6);
+    String viewName = "viewWithColumnAliases";
+
+    sql(
+        "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS 
SELECT id, data FROM %s WHERE id <= 3",
+        viewName, tableName);
+
+    View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, 
viewName));
+    assertThat(view.properties()).containsEntry("queryColumnNames", "id,data");
+
+    assertThat(view.schema().columns()).hasSize(2);
+    Types.NestedField first = view.schema().columns().get(0);
+    assertThat(first.name()).isEqualTo("new_id");
+    assertThat(first.doc()).isEqualTo("ID");
+
+    Types.NestedField second = view.schema().columns().get(1);
+    assertThat(second.name()).isEqualTo("new_data");
+    assertThat(second.doc()).isEqualTo("DATA");
+
+    assertThat(sql("SELECT new_id FROM %s", viewName))
+        .hasSize(3)
+        .containsExactlyInAnyOrder(row(1), row(2), row(3));
+
+    sql("DROP VIEW %s", viewName);
+
+    sql(
+        "CREATE VIEW %s (new_data, new_id) AS SELECT data, id FROM %s WHERE id 
<= 3",
+        viewName, tableName);
+
+    assertThat(sql("SELECT new_id FROM %s", viewName))
+        .hasSize(3)
+        .containsExactlyInAnyOrder(row(1), row(2), row(3));
+  }
+
+  @Test
+  public void createViewWithDuplicateColumnNames() {
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id) 
AS SELECT id, id FROM %s WHERE id <= 3",
+                    tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The column `new_id` already exists");
+  }
+
+  @Test
+  public void createViewWithDuplicateQueryColumnNames() throws 
NoSuchTableException {
+    insertRows(3);
+    String viewName = "viewWithDuplicateQueryColumnNames";
+    String sql = String.format("SELECT id, id FROM %s WHERE id <= 3", 
tableName);
+
+    // not specifying column aliases in the view should fail
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("The column `id` already exists");
+
+    sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql);
+
+    assertThat(sql("SELECT * FROM %s", viewName))
+        .hasSize(3)
+        .containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3));
+  }
+
+  @Test
+  public void createViewWithCTE() throws NoSuchTableException {
+    insertRows(10);
+    String viewName = "simpleViewWithCTE";
+    String sql =
+        String.format(
+            "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+                + "SELECT max, count(1) AS count FROM max_by_data GROUP BY 
max",
+            tableName);
+
+    sql("CREATE VIEW %s AS %s", viewName, sql);
+
+    assertThat(sql("SELECT * FROM %s", 
viewName)).hasSize(1).containsExactly(row(10, 1L));
+  }
+
+  @Test
+  public void createViewWithConflictingNamesForCTEAndTempView() throws 
NoSuchTableException {
+    insertRows(10);
+    String viewName = "viewWithConflictingNamesForCTEAndTempView";
+    String cteName = "cteName";
+    String sql =
+        String.format(
+            "WITH %s AS (SELECT max(id) as max FROM %s) "
+                + "(SELECT max, count(1) AS count FROM %s GROUP BY max)",
+            cteName, tableName, cteName);
+
+    // create a CTE and a TEMP VIEW with the same name
+    sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName);
+    sql("CREATE VIEW %s AS %s", viewName, sql);
+
+    // CTE should take precedence over the TEMP VIEW when data is read
+    assertThat(sql("SELECT * FROM %s", 
viewName)).hasSize(1).containsExactly(row(10, 1L));
+  }
+
+  @Test
+  public void createViewWithCTEReferencingTempView() {
+    String viewName = "viewWithCTEReferencingTempView";
+    String tempViewInCTE = "tempViewInCTE";
+    String sql =
+        String.format(
+            "WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+                + "SELECT max, count(1) AS count FROM max_by_data GROUP BY 
max",
+            tempViewInCTE);
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", 
tempViewInCTE, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot create the persistent object")
+        .hasMessageContaining(viewName)
+        .hasMessageContaining("of the type VIEW because it references to the 
temporary object")
+        .hasMessageContaining(tempViewInCTE);
+  }
+
+  @Test
+  public void createViewWithNonExistingQueryColumn() {
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE VIEW viewWithNonExistingQueryColumn AS SELECT 
non_existing FROM %s WHERE id <= 3",
+                    tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "A column or function parameter with name `non_existing` cannot be 
resolved");
+  }
+
+  @Test
+  public void createViewWithSubqueryExpressionUsingTempView() {
+    String viewName = "viewWithSubqueryExpression";
+    String tempView = "simpleTempView";
+    String sql =
+        String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", 
tableName, tempView);
+
+    sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", 
tempView, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create the persistent 
object %s", viewName))
+        .hasMessageContaining(
+            String.format("because it references to the temporary object %s", 
tempView));
+  }
+
+  @Test
+  public void createViewWithSubqueryExpressionUsingGlobalTempView() {
+    String viewName = "simpleViewWithSubqueryExpression";
+    String globalTempView = "simpleGlobalTempView";
+    String sql =
+        String.format(
+            "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)",
+            tableName, globalTempView);
+
+    sql(
+        "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5",
+        globalTempView, tableName);
+
+    assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(String.format("Cannot create the persistent 
object %s", viewName))
+        .hasMessageContaining(
+            String.format(
+                "because it references to the temporary object 
global_temp.%s", globalTempView));
+  }
+
   private void insertRows(int numRows) throws NoSuchTableException {
     List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
     for (int i = 1; i <= numRows; i++) {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index a62f6bb95a..37e7387d69 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.StringJoiner;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
@@ -568,6 +569,36 @@ public class SparkCatalog extends BaseCatalog
       String[] columnComments,
       Map<String, String> properties)
       throws ViewAlreadyExistsException, NoSuchNamespaceException {
+    if (null != asViewCatalog) {
+      Schema icebergSchema = SparkSchemaUtil.convert(schema);
+
+      StringJoiner joiner = new StringJoiner(",");
+      Arrays.stream(queryColumnNames).forEach(joiner::add);
+
+      try {
+        Map<String, String> props =
+            ImmutableMap.<String, String>builder()
+                .putAll(Spark3Util.rebuildCreateProperties(properties))
+                .put("queryColumnNames", joiner.toString())
+                .build();
+        org.apache.iceberg.view.View view =
+            asViewCatalog
+                .buildView(buildIdentifier(ident))
+                .withDefaultCatalog(currentCatalog)
+                .withDefaultNamespace(Namespace.of(currentNamespace))
+                .withQuery("spark", sql)
+                .withSchema(icebergSchema)
+                .withLocation(properties.get("location"))
+                .withProperties(props)
+                .create();
+        return new SparkView(catalogName, view);
+      } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
+        throw new NoSuchNamespaceException(currentNamespace);
+      } catch (AlreadyExistsException e) {
+        throw new ViewAlreadyExistsException(ident);
+      }
+    }
+
     throw new UnsupportedOperationException(
         "Creating a view is not supported by catalog: " + catalogName);
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
index 424519623e..5391d75476 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
@@ -35,8 +35,9 @@ import org.apache.spark.sql.types.StructType;
 
 public class SparkView implements org.apache.spark.sql.connector.catalog.View {
 
+  private static final String QUERY_COLUMN_NAMES = "queryColumnNames";
   private static final Set<String> RESERVED_PROPERTIES =
-      ImmutableSet.of("provider", "location", FORMAT_VERSION);
+      ImmutableSet.of("provider", "location", FORMAT_VERSION, 
QUERY_COLUMN_NAMES);
 
   private final View icebergView;
   private final String catalogName;
@@ -86,7 +87,9 @@ public class SparkView implements 
org.apache.spark.sql.connector.catalog.View {
 
   @Override
   public String[] queryColumnNames() {
-    return new String[0];
+    return icebergView.properties().containsKey(QUERY_COLUMN_NAMES)
+        ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",")
+        : new String[0];
   }
 
   @Override


Reply via email to