rdblue commented on code in PR #9332:
URL: https://github.com/apache/iceberg/pull/9332#discussion_r1430715685


##########
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala:
##########
@@ -122,37 +147,132 @@ class IcebergSparkSqlExtensionsParser(delegate: 
ParserInterface) extends ParserI
     if (isIcebergCommand(sqlTextAfterSubstitution)) {
       parse(sqlTextAfterSubstitution) { parser => 
astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan]
     } else {
-      delegate.parsePlan(sqlText)
+      ViewSubstitutionExecutor.execute(delegate.parsePlan(sqlText))
     }
   }
 
-  object UnresolvedIcebergTable {
+  private object ViewSubstitutionExecutor extends RuleExecutor[LogicalPlan] {
+    private val fixedPoint = FixedPoint(
+      maxIterations,
+      errorOnExceed = true,
+      maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
 
-    def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
-      EliminateSubqueryAliases(plan) match {
-        case UnresolvedRelation(multipartIdentifier, _, _) if 
isIcebergTable(multipartIdentifier) =>
-          Some(plan)
-        case _ =>
+    override protected def batches: Seq[Batch] = Seq(Batch("pre-substitution", 
fixedPoint, V2ViewSubstitution))
+  }
+
+  private object V2ViewSubstitution extends Rule[LogicalPlan] {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+    // the reason for handling these cases here is because 
ResolveSessionCatalog exits early for v2 commands
+    override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
+      case u@UnresolvedView(identifier, _, _, _) =>
+        lookupTableOrView(identifier, viewOnly = true).getOrElse(u)
+
+      case u@UnresolvedTableOrView(identifier, _, _) =>
+        lookupTableOrView(identifier).getOrElse(u)
+
+      case CreateView(UnresolvedIdentifier(nameParts, allowTemp), 
userSpecifiedColumns,
+      comment, properties, originalText, query, allowExisting, replace) =>
+        CreateIcebergView(UnresolvedIdentifier(nameParts, allowTemp), 
userSpecifiedColumns,
+          comment, properties, originalText, query, allowExisting, replace)
+
+      case ShowViews(UnresolvedNamespace(multipartIdentifier), pattern, 
output) =>
+        ShowIcebergViews(UnresolvedNamespace(multipartIdentifier), pattern, 
output)
+
+      case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) =>
+        DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists)
+    }
+
+    private def expandIdentifier(nameParts: Seq[String]): Seq[String] = {
+      if (!isResolvingView || isReferredTempViewName(nameParts)) return 
nameParts
+
+      if (nameParts.length == 1) {
+        AnalysisContext.get.catalogAndNamespace :+ nameParts.head
+      } else if 
(SparkSession.active.sessionState.catalogManager.isCatalogRegistered(nameParts.head))
 {
+        nameParts
+      } else {
+        AnalysisContext.get.catalogAndNamespace.head +: nameParts
+      }
+    }
+
+    /**
+     * Resolves relations to `ResolvedTable` or 
`Resolved[Temp/Persistent]View`. This is
+     * for resolving DDL and misc commands. Code is copied from Spark's 
Analyzer, but performs
+     * a view lookup before performing a table lookup.
+     */
+    private def lookupTableOrView(
+                                   identifier: Seq[String],
+                                   viewOnly: Boolean = false): 
Option[LogicalPlan] = {
+      lookupTempView(identifier).map { tempView =>
+        ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema)
+      }.orElse {
+        val multipartIdent = expandIdentifier(identifier)
+        val catalogAndIdentifier = 
Spark3Util.catalogAndIdentifier(SparkSession.active, multipartIdent.asJava)
+        if (null != catalogAndIdentifier) {
+          
lookupView(SparkSession.active.sessionState.catalogManager.currentCatalog,
+            catalogAndIdentifier.identifier())
+            
.orElse(lookupTable(SparkSession.active.sessionState.catalogManager.currentCatalog,
+              catalogAndIdentifier.identifier()))
+        } else {
           None
+        }
       }
     }
 
-    private def isIcebergTable(multipartIdent: Seq[String]): Boolean = {
-      val catalogAndIdentifier = 
Spark3Util.catalogAndIdentifier(SparkSession.active, multipartIdent.asJava)
-      catalogAndIdentifier.catalog match {
-        case tableCatalog: TableCatalog =>
-          Try(tableCatalog.loadTable(catalogAndIdentifier.identifier))
-            .map(isIcebergTable)
-            .getOrElse(false)
+    private def isResolvingView: Boolean = 
AnalysisContext.get.catalogAndNamespace.nonEmpty
 
-        case _ =>
-          false
+    private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {

Review Comment:
   I don't think this is needed for Iceberg views because Iceberg views 
_cannot_ reference anything other than tables or other Iceberg views. Here's 
the Javadoc for `referredTempViewNames`:
   
   > All the temp view names referred by the current view we are resolving. 
It's used to make sure the relation resolution is consistent between view 
creation and view resolution. For example, if `t` was a permanent table when 
the current view was created, it should still be a permanent table when 
resolving the current view, even if a temp view `t` has been created.
   
   Because we know that all references must be tables, the set of names that 
can be resolved as temporary views should always be empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to