yihua commented on code in PR #18621:
URL: https://github.com/apache/hudi/pull/18621#discussion_r3284082966


##########
hudi-spark-datasource/hudi-spark4.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark4_2ExtendedSqlAstBuilder.scala:
##########
@@ -0,0 +1,3567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.spark.sql.parser.{HoodieSqlBaseBaseVisitor, 
HoodieSqlBaseParser}
+import org.apache.hudi.spark.sql.parser.HoodieSqlBaseParser._
+
+import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
+import org.apache.spark.sql.catalyst.parser.{EnhancedLogicalPlan, 
ParseException, ParserInterface}
+import 
org.apache.spark.sql.catalyst.parser.ParserUtils.{checkDuplicateClauses, 
checkDuplicateKeys, entry, escapedIdentifier, operationNotAllowed, source, 
string, stringWithoutUnescape, validate, withOrigin}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils, 
DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.BucketSpecHelper
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+import org.apache.spark.util.Utils.isTesting
+import org.apache.spark.util.random.RandomSampler
+
+import javax.xml.bind.DatatypeConverter
+
+import java.util.Locale
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * The AstBuilder for HoodieSqlParser to parser the AST tree to Logical Plan.
+ * Here we only do the parser for the extended sql syntax. e.g MergeInto. For
+ * other sql syntax we use the delegate sql parser which is the SparkSqlParser.
+ */
+class HoodieSpark4_2ExtendedSqlAstBuilder(conf: SQLConf, delegate: 
ParserInterface)
+  extends HoodieSqlBaseBaseVisitor[AnyRef] with Logging {
+
+  protected def typedVisit[T](ctx: ParseTree): T = {
+    ctx.accept(this).asInstanceOf[T]
+  }
+
+  /**
+   * Override the default behavior for all visit methods. This will only 
return a non-null result
+   * when the context has only one child. This is done because there is no 
generic method to
+   * combine the results of the context children. In all other cases null is 
returned.
+   */
+  override def visitChildren(node: RuleNode): AnyRef = {
+    if (node.getChildCount == 1) {
+      node.getChild(0).accept(this)
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Create an aliased table reference. This is typically used in FROM clauses.
+   */
+  override def visitTableName(ctx: TableNameContext): LogicalPlan = 
withOrigin(ctx) {
+    val tableId = visitMultipartIdentifier(ctx.multipartIdentifier())
+    val relation = UnresolvedRelation(tableId)
+    val table = mayApplyAliasPlan(
+      ctx.tableAlias, relation.optionalMap(ctx.temporalClause)(withTimeTravel))
+    table.optionalMap(ctx.sample)(withSample)
+  }
+
+  private def withTimeTravel(
+                              ctx: TemporalClauseContext, plan: LogicalPlan): 
LogicalPlan = withOrigin(ctx) {
+    val v = ctx.version
+    val version = if (ctx.INTEGER_VALUE != null) {
+      Some(v.getText)
+    } else {
+      Option(v).map(string)
+    }
+
+    val timestamp = Option(ctx.timestamp).map(expression)
+    if (timestamp.exists(_.references.nonEmpty)) {
+      throw new ParseException(
+        "timestamp expression cannot refer to any columns", ctx.timestamp)
+    }
+    if (timestamp.exists(e => SubqueryExpression.hasSubquery(e))) {
+      throw new ParseException(
+        "timestamp expression cannot contain subqueries", ctx.timestamp)
+    }
+
+    TimeTravelRelation(plan, timestamp, version)
+  }
+
+  // ============== The following code is fork from 
org.apache.spark.sql.catalyst.parser.AstBuilder
+  override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan 
= withOrigin(ctx) {
+    visit(ctx.statement).asInstanceOf[LogicalPlan]
+  }
+
+  override def visitSingleExpression(ctx: SingleExpressionContext): Expression 
= withOrigin(ctx) {
+    visitNamedExpression(ctx.namedExpression)
+  }
+
+  override def visitSingleTableIdentifier(
+                                           ctx: SingleTableIdentifierContext): 
TableIdentifier = withOrigin(ctx) {
+    visitTableIdentifier(ctx.tableIdentifier)
+  }
+
+  override def visitSingleFunctionIdentifier(
+                                              ctx: 
SingleFunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) {
+    visitFunctionIdentifier(ctx.functionIdentifier)
+  }
+
+  override def visitSingleMultipartIdentifier(
+                                               ctx: 
SingleMultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
+    visitMultipartIdentifier(ctx.multipartIdentifier)
+  }
+
+  override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = 
withOrigin(ctx) {
+    typedVisit[DataType](ctx.dataType)
+  }
+
+  override def visitSingleTableSchema(ctx: SingleTableSchemaContext): 
StructType = {
+    val schema = StructType(visitColTypeList(ctx.colTypeList))
+    withOrigin(ctx)(schema)
+  }
+
+  /* 
********************************************************************************************
+   * Plan parsing
+   * 
********************************************************************************************
 */
+  protected def plan(tree: ParserRuleContext): LogicalPlan = typedVisit(tree)
+
+  /**
+   * Create a top-level plan with Common Table Expressions.
+   */
+  override def visitQuery(ctx: QueryContext): LogicalPlan = withOrigin(ctx) {
+    val query = 
plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(withQueryResultClauses)
+
+    // Apply CTEs
+    query.optionalMap(ctx.ctes)(withCTE)
+  }
+
+  override def visitDmlStatement(ctx: DmlStatementContext): AnyRef = 
withOrigin(ctx) {
+    val dmlStmt = plan(ctx.dmlStatementNoWith)
+    // Apply CTEs
+    dmlStmt.optionalMap(ctx.ctes)(withCTE)
+  }
+
+  private def withCTE(ctx: CtesContext, plan: LogicalPlan): LogicalPlan = {
+    val ctes = ctx.namedQuery.asScala.map { nCtx =>
+      val namedQuery = visitNamedQuery(nCtx)
+      val rowLevelLimit: Option[Int] = if (nCtx.integerValue() != null) {
+        if (ctx.RECURSIVE() == null) {
+          operationNotAllowed("Cannot specify MAX RECURSION LEVEL when the CTE 
is not marked as " +
+            "RECURSIVE", ctx)
+        }
+        Some(getIntegerValue(nCtx.integerValue()))
+      } else {
+        None
+      }
+      (namedQuery.alias, namedQuery, rowLevelLimit)
+    }
+    // Check for duplicate names.
+    val duplicates = ctes.groupBy(_._1).filter(_._2.size > 1).keys
+    if (duplicates.nonEmpty) {
+      throw new ParseException(s"CTE definition can't have duplicate names: 
${duplicates.mkString("'", "', '", "'")}.", ctx)
+    }
+    UnresolvedWith(plan, ctes.toSeq, ctx.RECURSIVE() != null)
+  }
+
+  /**
+   * Gets the integer value from an IntegerValueContext after parameter 
replacement. Asserts that
+   * parameter markers have been substituted before reaching 
DataTypeAstBuilder.
+   *
+   * @param ctx
+   * The IntegerValueContext to extract the integer from
+   * @return
+   * The integer value
+   */
+  private def getIntegerValue(ctx: IntegerValueContext): Int = {
+    assert(
+      !ctx.isInstanceOf[ParameterIntegerValueContext],
+      "Parameter markers should be substituted before DataTypeAstBuilder 
processes the " +
+        s"parse tree. Found unsubstituted parameter: ${ctx.getText}")
+    ctx.getText.toInt
+  }
+
+  /**
+   * Create a logical query plan for a hive-style FROM statement body.
+   */
+  private def withFromStatementBody(
+                                     ctx: FromStatementBodyContext, plan: 
LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    // two cases for transforms and selects
+    if (ctx.transformClause != null) {
+      withTransformQuerySpecification(
+        ctx,
+        ctx.transformClause,
+        ctx.lateralView,
+        ctx.whereClause,
+        ctx.aggregationClause,
+        ctx.havingClause,
+        ctx.windowClause,
+        plan
+      )
+    } else {
+      withSelectQuerySpecification(
+        ctx,
+        ctx.selectClause,
+        ctx.lateralView,
+        ctx.whereClause,
+        ctx.aggregationClause,
+        ctx.havingClause,
+        ctx.windowClause,
+        plan
+      )
+    }
+  }
+
+  override def visitFromStatement(ctx: FromStatementContext): LogicalPlan = 
withOrigin(ctx) {
+    val from = visitFromClause(ctx.fromClause)
+    val selects = ctx.fromStatementBody.asScala.map { body =>
+      withFromStatementBody(body, from).
+        // Add organization statements.
+        optionalMap(body.queryOrganization)(withQueryResultClauses)
+    }
+    // If there are multiple SELECT just UNION them together into one query.
+    if (selects.length == 1) {
+      selects.head
+    } else {
+      Union(selects.toSeq)
+    }
+  }
+
+  /**
+   * Create a named logical plan.
+   *
+   * This is only used for Common Table Expressions.
+   */
+  override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = 
withOrigin(ctx) {
+    val subQuery: LogicalPlan = plan(ctx.query).optionalMap(ctx.columnAliases)(
+      (columnAliases, plan) =>
+        UnresolvedSubqueryColumnAliases(visitIdentifierList(columnAliases), 
plan)
+    )
+    SubqueryAlias(ctx.name.getText, subQuery)
+  }
+
+  /**
+   * Create a logical plan which allows for multiple inserts using one 'from' 
statement. These
+   * queries have the following SQL form:
+   * {{{
+   *   [WITH cte...]?
+   *   FROM src
+   *   [INSERT INTO tbl1 SELECT *]+
+   * }}}
+   * For example:
+   * {{{
+   *   FROM db.tbl1 A
+   *   INSERT INTO dbo.tbl1 SELECT * WHERE A.value = 10 LIMIT 5
+   *   INSERT INTO dbo.tbl2 SELECT * WHERE A.value = 12
+   * }}}
+   * This (Hive) feature cannot be combined with set-operators.
+   */
+  override def visitMultiInsertQuery(ctx: MultiInsertQueryContext): 
LogicalPlan = withOrigin(ctx) {
+    val from = visitFromClause(ctx.fromClause)
+
+    // Build the insert clauses.
+    val inserts = ctx.multiInsertQueryBody.asScala.map { body =>
+      withInsertInto(body.insertInto,
+        withFromStatementBody(body.fromStatementBody, from).
+          
optionalMap(body.fromStatementBody.queryOrganization)(withQueryResultClauses))
+    }
+
+    // If there are multiple INSERTS just UNION them together into one query.
+    if (inserts.length == 1) {
+      inserts.head
+    } else {
+      Union(inserts.toSeq)
+    }
+  }
+
+  /**
+   * Create a logical plan for a regular (single-insert) query.
+   */
+  override def visitSingleInsertQuery(
+                                       ctx: SingleInsertQueryContext): 
LogicalPlan = withOrigin(ctx) {
+    withInsertInto(
+      ctx.insertInto(),
+      
plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(withQueryResultClauses))
+  }
+
+  /**
+   * Parameters used for writing query to a table:
+   * (UnresolvedRelation, tableColumnList, partitionKeys, 
ifPartitionNotExists).
+   */
+  type InsertTableParams = (UnresolvedRelation, Seq[String], Map[String, 
Option[String]], Boolean)
+
+  /**
+   * Parameters used for writing query to a directory: (isLocal, 
CatalogStorageFormat, provider).
+   */
+  type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String])
+
+  /**
+   * Add an
+   * {{{
+   *   INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? 
[identifierList]
+   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec]  [identifierList]
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS 
tablePropertyList]
+   * }}}
+   * operation to logical plan
+   */
+  private def withInsertInto(
+                              ctx: InsertIntoContext,
+                              query: LogicalPlan): LogicalPlan = 
withOrigin(ctx) {
+    ctx match {
+      case table: InsertIntoTableContext =>
+        val (relation, cols, partition, ifPartitionNotExists) = 
visitInsertIntoTable(table)
+        InsertIntoStatement(
+          relation,
+          partition,
+          cols,
+          query,
+          overwrite = false,
+          ifPartitionNotExists)
+      case table: InsertOverwriteTableContext =>
+        val (relation, cols, partition, ifPartitionNotExists) = 
visitInsertOverwriteTable(table)
+        InsertIntoStatement(
+          relation,
+          partition,
+          cols,
+          query,
+          overwrite = true,
+          ifPartitionNotExists)
+      case dir: InsertOverwriteDirContext =>
+        val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
+        InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+      case hiveDir: InsertOverwriteHiveDirContext =>
+        val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir)
+        InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+      case _ =>
+        throw new ParseException("Invalid InsertIntoContext", ctx)
+    }
+  }
+
+  /**
+   * Add an INSERT INTO TABLE operation to the logical plan.
+   */
+  override def visitInsertIntoTable(
+                                     ctx: InsertIntoTableContext): 
InsertTableParams = withOrigin(ctx) {
+    val cols = 
Option(ctx.identifierList()).map(visitIdentifierList).getOrElse(Nil)
+    val partitionKeys = 
Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+
+    if (ctx.EXISTS != null) {
+      operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx)
+    }
+
+    (createUnresolvedRelation(ctx.multipartIdentifier), cols, partitionKeys, 
false)
+  }
+
+  /**
+   * Add an INSERT OVERWRITE TABLE operation to the logical plan.
+   */
+  override def visitInsertOverwriteTable(
+                                          ctx: InsertOverwriteTableContext): 
InsertTableParams = withOrigin(ctx) {
+    assert(ctx.OVERWRITE() != null)
+    val cols = 
Option(ctx.identifierList()).map(visitIdentifierList).getOrElse(Nil)
+    val partitionKeys = 
Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+
+    val dynamicPartitionKeys: Map[String, Option[String]] = 
partitionKeys.filter(_._2.isEmpty)
+    if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) {
+      operationNotAllowed("IF NOT EXISTS with dynamic partitions: " +
+        dynamicPartitionKeys.keys.mkString(", "), ctx)
+    }
+
+    (createUnresolvedRelation(ctx.multipartIdentifier), cols, partitionKeys, 
ctx.EXISTS() != null)
+  }
+
+  /**
+   * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+   */
+  override def visitInsertOverwriteDir(
+                                        ctx: InsertOverwriteDirContext): 
InsertDirParams = withOrigin(ctx) {
+    throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", 
ctx)
+  }
+
+  /**
+   * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+   */
+  override def visitInsertOverwriteHiveDir(
+                                            ctx: 
InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
+    throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", 
ctx)
+  }
+
+  private def getTableAliasWithoutColumnAlias(
+                                               ctx: TableAliasContext, op: 
String): Option[String] = {
+    if (ctx == null) {
+      None
+    } else {
+      val ident = ctx.strictIdentifier()
+      if (ctx.identifierList() != null) {
+        throw new ParseException(s"Columns aliases are not allowed in $op.", 
ctx.identifierList())
+      }
+      if (ident != null) Some(ident.getText) else None
+    }
+  }
+
+  override def visitDeleteFromTable(
+                                     ctx: DeleteFromTableContext): LogicalPlan 
= withOrigin(ctx) {
+    val table = createUnresolvedRelation(ctx.multipartIdentifier())
+    val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), 
"DELETE")
+    val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
+    val predicate = if (ctx.whereClause() != null) {
+      Some(expression(ctx.whereClause().booleanExpression()))
+    } else {
+      None
+    }
+    DeleteFromTable(aliasedTable, predicate.get)
+  }
+
+  override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = 
withOrigin(ctx) {
+    val table = createUnresolvedRelation(ctx.multipartIdentifier())
+    val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), 
"UPDATE")
+    val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
+    val assignments = withAssignments(ctx.setClause().assignmentList())
+    val predicate = if (ctx.whereClause() != null) {
+      Some(expression(ctx.whereClause().booleanExpression()))
+    } else {
+      None
+    }
+
+    UpdateTable(aliasedTable, assignments, predicate)
+  }
+
+  private def withAssignments(assignCtx: AssignmentListContext): 
Seq[Assignment] =
+    withOrigin(assignCtx) {
+      assignCtx.assignment().asScala.map { assign =>
+        Assignment(UnresolvedAttribute(visitMultipartIdentifier(assign.key)),
+          expression(assign.value))
+      }.toSeq
+    }
+
+  override def visitMergeIntoTable(ctx: MergeIntoTableContext): LogicalPlan = 
withOrigin(ctx) {
+    val targetTable = createUnresolvedRelation(ctx.target)
+    val targetTableAlias = getTableAliasWithoutColumnAlias(ctx.targetAlias, 
"MERGE")
+    val aliasedTarget = targetTableAlias.map(SubqueryAlias(_, 
targetTable)).getOrElse(targetTable)
+
+    val sourceTableOrQuery = if (ctx.source != null) {
+      createUnresolvedRelation(ctx.source)
+    } else if (ctx.sourceQuery != null) {
+      visitQuery(ctx.sourceQuery)
+    } else {
+      throw new ParseException("Empty source for merge: you should specify a 
source" +
+        " table/subquery in merge.", ctx.source)
+    }
+    val sourceTableAlias = getTableAliasWithoutColumnAlias(ctx.sourceAlias, 
"MERGE")
+    val aliasedSource =
+      sourceTableAlias.map(SubqueryAlias(_, 
sourceTableOrQuery)).getOrElse(sourceTableOrQuery)
+
+    val mergeCondition = expression(ctx.mergeCondition)
+
+    val matchedActions = ctx.matchedClause().asScala.map {
+      clause => {
+        if (clause.matchedAction().DELETE() != null) {
+          DeleteAction(Option(clause.matchedCond).map(expression))
+        } else if (clause.matchedAction().UPDATE() != null) {
+          val condition = Option(clause.matchedCond).map(expression)
+          if (clause.matchedAction().ASTERISK() != null) {
+            UpdateStarAction(condition)
+          } else {
+            UpdateAction(condition, 
withAssignments(clause.matchedAction().assignmentList()))
+          }
+        } else {
+          // It should not be here.
+          throw new ParseException(s"Unrecognized matched action: 
${clause.matchedAction().getText}",
+            clause.matchedAction())
+        }
+      }
+    }
+    val notMatchedActions = ctx.notMatchedClause().asScala.map {
+      clause => {
+        if (clause.notMatchedAction().INSERT() != null) {
+          val condition = Option(clause.notMatchedCond).map(expression)
+          if (clause.notMatchedAction().ASTERISK() != null) {
+            InsertStarAction(condition)
+          } else {
+            val columns = 
clause.notMatchedAction().columns.multipartIdentifier()
+              .asScala.map(attr => 
UnresolvedAttribute(visitMultipartIdentifier(attr)))
+            val values = 
clause.notMatchedAction().expression().asScala.map(expression)
+            if (columns.size != values.size) {
+              throw new ParseException("The number of inserted values cannot 
match the fields.",
+                clause.notMatchedAction())
+            }
+            InsertAction(condition, columns.zip(values).map(kv => 
Assignment(kv._1, kv._2)).toSeq)
+          }
+        } else {
+          // It should not be here.
+          throw new ParseException(s"Unrecognized not matched action: 
${clause.notMatchedAction().getText}",
+            clause.notMatchedAction())
+        }
+      }
+    }
+    if (matchedActions.isEmpty && notMatchedActions.isEmpty) {
+      throw new ParseException("There must be at least one WHEN clause in a 
MERGE statement", ctx)
+    }
+    // children being empty means that the condition is not set
+    val matchedActionSize = matchedActions.length
+    if (matchedActionSize >= 2 && 
!matchedActions.init.forall(_.condition.nonEmpty)) {
+      throw new ParseException("When there are more than one MATCHED clauses 
in a MERGE " +
+        "statement, only the last MATCHED clause can omit the condition.", ctx)
+    }
+    val notMatchedActionSize = notMatchedActions.length
+    if (notMatchedActionSize >= 2 && 
!notMatchedActions.init.forall(_.condition.nonEmpty)) {
+      throw new ParseException("When there are more than one NOT MATCHED 
clauses in a MERGE " +
+        "statement, only the last NOT MATCHED clause can omit the condition.", 
ctx)
+    }
+
+    MergeIntoTable(
+      aliasedTarget,
+      aliasedSource,
+      mergeCondition,
+      matchedActions.toSeq,
+      notMatchedActions.toSeq,
+      Seq.empty,
+      false
+    )
+  }
+
+  /**
+   * Create a partition specification map.
+   */
+  override def visitPartitionSpec(
+                                   ctx: PartitionSpecContext): Map[String, 
Option[String]] = withOrigin(ctx) {
+    val legacyNullAsString =
+      conf.getConf(SQLConf.LEGACY_PARSE_NULL_PARTITION_SPEC_AS_STRING_LITERAL)
+    val parts = ctx.partitionVal.asScala.map { pVal =>
+      val name = pVal.identifier.getText
+      val value = Option(pVal.constant).map(v => visitStringConstant(v, 
legacyNullAsString))
+      name -> value
+    }
+    // Before calling `toMap`, we check duplicated keys to avoid silently 
ignore partition values
+    // in partition spec like PARTITION(a='1', b='2', a='3'). The real 
semantical check for
+    // partition columns will be done in analyzer.
+    if (conf.caseSensitiveAnalysis) {
+      checkDuplicateKeys(parts.toSeq, ctx)
+    } else {
+      checkDuplicateKeys(parts.map(kv => kv._1.toLowerCase(Locale.ROOT) -> 
kv._2).toSeq, ctx)
+    }
+    parts.toMap
+  }
+
+  /**
+   * Create a partition specification map without optional values.
+   */
+  protected def visitNonOptionalPartitionSpec(
+                                               ctx: PartitionSpecContext): 
Map[String, String] = withOrigin(ctx) {
+    visitPartitionSpec(ctx).map {
+      case (key, None) => throw new ParseException(s"Found an empty partition 
key '$key'.", ctx)
+      case (key, Some(value)) => key -> value
+    }
+  }
+
+  /**
+   * Convert a constant of any type into a string. This is typically used in 
DDL commands, and its
+   * main purpose is to prevent slight differences due to back to back 
conversions i.e.:
+   * String -> Literal -> String.
+   */
+  protected def visitStringConstant(
+                                     ctx: ConstantContext,
+                                     legacyNullAsString: Boolean): String = 
withOrigin(ctx) {
+    expression(ctx) match {
+      case Literal(null, _) if !legacyNullAsString => null
+      case l@Literal(null, _) => l.toString
+      case l: Literal =>
+        // TODO For v2 commands, we will cast the string back to its actual 
value,
+        //  which is a waste and can be improved in the future.
+        Cast(l, StringType, Some(conf.sessionLocalTimeZone)).eval().toString
+      case other =>
+        throw new IllegalArgumentException(s"Only literals are allowed in the 
" +
+          s"partition spec, but got ${other.sql}")
+    }
+  }
+
+  /**
+   * Add ORDER BY/SORT BY/CLUSTER BY/DISTRIBUTE BY/LIMIT/WINDOWS clauses to 
the logical plan. These
+   * clauses determine the shape (ordering/partitioning/rows) of the query 
result.
+   */
+  private def withQueryResultClauses(
+                                      ctx: QueryOrganizationContext,
+                                      query: LogicalPlan): LogicalPlan = 
withOrigin(ctx) {
+    import ctx._
+
+    // Handle ORDER BY, SORT BY, DISTRIBUTE BY, and CLUSTER BY clause.
+    val withOrder = if (
+      !order.isEmpty && sort.isEmpty && distributeBy.isEmpty && 
clusterBy.isEmpty) {
+      // ORDER BY ...
+      Sort(order.asScala.map(visitSortItem).toSeq, global = true, query)
+    } else if (order.isEmpty && !sort.isEmpty && distributeBy.isEmpty && 
clusterBy.isEmpty) {
+      // SORT BY ...
+      Sort(sort.asScala.map(visitSortItem).toSeq, global = false, query)
+    } else if (order.isEmpty && sort.isEmpty && !distributeBy.isEmpty && 
clusterBy.isEmpty) {
+      // DISTRIBUTE BY ...
+      withRepartitionByExpression(ctx, expressionList(distributeBy), query)
+    } else if (order.isEmpty && !sort.isEmpty && !distributeBy.isEmpty && 
clusterBy.isEmpty) {
+      // SORT BY ... DISTRIBUTE BY ...
+      Sort(
+        sort.asScala.map(visitSortItem).toSeq,
+        global = false,
+        withRepartitionByExpression(ctx, expressionList(distributeBy), query))
+    } else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && 
!clusterBy.isEmpty) {
+      // CLUSTER BY ...
+      val expressions = expressionList(clusterBy)
+      Sort(
+        expressions.map(SortOrder(_, Ascending)),
+        global = false,
+        withRepartitionByExpression(ctx, expressions, query))
+    } else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && 
clusterBy.isEmpty) {
+      // [EMPTY]
+      query
+    } else {
+      throw new ParseException(
+        "Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY is not 
supported", ctx)
+    }
+
+    // WINDOWS
+    val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)
+
+    // LIMIT
+    // - LIMIT ALL is the same as omitting the LIMIT clause
+    withWindow.optional(limit) {
+      Limit(typedVisit(limit), withWindow)
+    }
+  }
+
+  /**
+   * Create a clause for DISTRIBUTE BY.
+   */
+  protected def withRepartitionByExpression(
+                                             ctx: QueryOrganizationContext,
+                                             expressions: Seq[Expression],
+                                             query: LogicalPlan): LogicalPlan 
= {
+    RepartitionByExpression(expressions, query, None)
+  }
+
+  override def visitTransformQuerySpecification(
+                                                 ctx: 
TransformQuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
+    val from = OneRowRelation().optional(ctx.fromClause) {
+      visitFromClause(ctx.fromClause)
+    }
+    withTransformQuerySpecification(
+      ctx,
+      ctx.transformClause,
+      ctx.lateralView,
+      ctx.whereClause,
+      ctx.aggregationClause,
+      ctx.havingClause,
+      ctx.windowClause,
+      from
+    )
+  }
+
+  override def visitRegularQuerySpecification(
+                                               ctx: 
RegularQuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
+    val from = OneRowRelation().optional(ctx.fromClause) {
+      visitFromClause(ctx.fromClause)
+    }
+    withSelectQuerySpecification(
+      ctx,
+      ctx.selectClause,
+      ctx.lateralView,
+      ctx.whereClause,
+      ctx.aggregationClause,
+      ctx.havingClause,
+      ctx.windowClause,
+      from
+    )
+  }
+
+  override def visitNamedExpressionSeq(
+                                        ctx: NamedExpressionSeqContext): 
Seq[Expression] = {
+    Option(ctx).toSeq
+      .flatMap(_.namedExpression.asScala)
+      .map(typedVisit[Expression])
+  }
+
+  override def visitExpressionSeq(ctx: ExpressionSeqContext): Seq[Expression] 
= {
+    Option(ctx).toSeq
+      .flatMap(_.expression.asScala)
+      .map(typedVisit[Expression])
+  }
+
+  /**
+   * Create a logical plan using a having clause.
+   */
+  private def withHavingClause(
+                                ctx: HavingClauseContext, plan: LogicalPlan): 
LogicalPlan = {
+    // Note that we add a cast to non-predicate expressions. If the expression 
itself is
+    // already boolean, the optimizer will get rid of the unnecessary cast.
+    val predicate = expression(ctx.booleanExpression) match {
+      case p: Predicate => p
+      case e => Cast(e, BooleanType)
+    }
+    UnresolvedHaving(predicate, plan)
+  }
+
+  /**
+   * Create a logical plan using a where clause.
+   */
+  private def withWhereClause(ctx: WhereClauseContext, plan: LogicalPlan): 
LogicalPlan = {
+    Filter(expression(ctx.booleanExpression), plan)
+  }
+
+  /**
+   * Add a hive-style transform (SELECT TRANSFORM/MAP/REDUCE) query 
specification to a logical plan.
+   */
+  private def withTransformQuerySpecification(
+                                               ctx: ParserRuleContext,
+                                               transformClause: 
TransformClauseContext,
+                                               lateralView: 
java.util.List[LateralViewContext],
+                                               whereClause: WhereClauseContext,
+                                               aggregationClause: 
AggregationClauseContext,
+                                               havingClause: 
HavingClauseContext,
+                                               windowClause: 
WindowClauseContext,
+                                               relation: LogicalPlan): 
LogicalPlan = withOrigin(ctx) {
+    if (transformClause.setQuantifier != null) {
+      throw new ParseException("TRANSFORM does not support DISTINCT/ALL in 
inputs", transformClause.setQuantifier)
+    }
+    // Create the attributes.
+    val (attributes, schemaLess) = if (transformClause.colTypeList != null) {
+      // Typed return columns.
+      (DataTypeUtils.toAttributes(createSchema(transformClause.colTypeList)), 
false)
+    } else if (transformClause.identifierSeq != null) {
+      // Untyped return columns.
+      val attrs = visitIdentifierSeq(transformClause.identifierSeq).map { name 
=>
+        AttributeReference(name, StringType, nullable = true)()
+      }
+      (attrs, false)
+    } else {
+      (Seq(AttributeReference("key", StringType)(),
+        AttributeReference("value", StringType)()), true)
+    }
+
+    val plan = visitCommonSelectQueryClausePlan(
+      relation,
+      visitExpressionSeq(transformClause.expressionSeq),
+      lateralView,
+      whereClause,
+      aggregationClause,
+      havingClause,
+      windowClause,
+      isDistinct = false)
+
+    ScriptTransformation(
+      string(transformClause.script),
+      attributes,
+      plan,
+      withScriptIOSchema(
+        ctx,
+        transformClause.inRowFormat,
+        transformClause.recordWriter,
+        transformClause.outRowFormat,
+        transformClause.recordReader,
+        schemaLess
+      )
+    )
+  }
+
+  /**
+   * Add a regular (SELECT) query specification to a logical plan. The query 
specification
+   * is the core of the logical plan, this is where sourcing (FROM clause), 
projection (SELECT),
+   * aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
+   *
+   * Note that query hints are ignored (both by the parser and the builder).
+   */
+  private def withSelectQuerySpecification(
+                                            ctx: ParserRuleContext,
+                                            selectClause: SelectClauseContext,
+                                            lateralView: 
java.util.List[LateralViewContext],
+                                            whereClause: WhereClauseContext,
+                                            aggregationClause: 
AggregationClauseContext,
+                                            havingClause: HavingClauseContext,
+                                            windowClause: WindowClauseContext,
+                                            relation: LogicalPlan): 
LogicalPlan = withOrigin(ctx) {
+    val isDistinct = selectClause.setQuantifier() != null &&
+      selectClause.setQuantifier().DISTINCT() != null
+
+    val plan = visitCommonSelectQueryClausePlan(
+      relation,
+      visitNamedExpressionSeq(selectClause.namedExpressionSeq),
+      lateralView,
+      whereClause,
+      aggregationClause,
+      havingClause,
+      windowClause,
+      isDistinct)
+
+    // Hint
+    selectClause.hints.asScala.foldRight(plan)(withHints)
+  }
+
+  def visitCommonSelectQueryClausePlan(
+                                        relation: LogicalPlan,
+                                        expressions: Seq[Expression],
+                                        lateralView: 
java.util.List[LateralViewContext],
+                                        whereClause: WhereClauseContext,
+                                        aggregationClause: 
AggregationClauseContext,
+                                        havingClause: HavingClauseContext,
+                                        windowClause: WindowClauseContext,
+                                        isDistinct: Boolean): LogicalPlan = {
+    // Add lateral views.
+    val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate)
+
+    // Add where.
+    val withFilter = withLateralView.optionalMap(whereClause)(withWhereClause)
+
+    // Add aggregation or a project.
+    val namedExpressions = expressions.map {
+      case e: NamedExpression => e
+      case e: Expression => UnresolvedAlias(e)
+    }
+
+    def createProject() = if (namedExpressions.nonEmpty) {
+      Project(namedExpressions, withFilter)
+    } else {
+      withFilter
+    }
+
+    val withProject = if (aggregationClause == null && havingClause != null) {
+      if (conf.getConf(SQLConf.LEGACY_HAVING_WITHOUT_GROUP_BY_AS_WHERE)) {
+        // If the legacy conf is set, treat HAVING without GROUP BY as WHERE.
+        val predicate = expression(havingClause.booleanExpression) match {
+          case p: Predicate => p
+          case e => Cast(e, BooleanType)
+        }
+        Filter(predicate, createProject())
+      } else {
+        // According to SQL standard, HAVING without GROUP BY means global 
aggregate.
+        withHavingClause(havingClause, Aggregate(Nil, namedExpressions, 
withFilter))
+      }
+    } else if (aggregationClause != null) {
+      val aggregate = withAggregationClause(aggregationClause, 
namedExpressions, withFilter)
+      aggregate.optionalMap(havingClause)(withHavingClause)
+    } else {
+      // When hitting this branch, `having` must be null.
+      createProject()
+    }
+
+    // Distinct
+    val withDistinct = if (isDistinct) {
+      Distinct(withProject)
+    } else {
+      withProject
+    }
+
+    // Window
+    val withWindow = withDistinct.optionalMap(windowClause)(withWindowClause)
+
+    withWindow
+  }
+
+  // Script Transform's input/output format.
+  type ScriptIOFormat =
+    (Seq[(String, String)], Option[String], Seq[(String, String)], 
Option[String])
+
+  protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): 
ScriptIOFormat = {
+    // TODO we should use the visitRowFormatDelimited function here. However 
HiveScriptIOSchema
+    // expects a seq of pairs in which the old parsers' token names are used 
as keys.
+    // Transforming the result of visitRowFormatDelimited would be quite a bit 
messier than
+    // retrieving the key value pairs ourselves.
+    val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++
+      entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++
+      entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++
+      entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) ++
+      Option(ctx.linesSeparatedBy).toSeq.map { token =>
+        val value = string(token)
+        validate(
+          value == "\n",
+          s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
+          ctx)
+        "TOK_TABLEROWFORMATLINES" -> value
+      }
+
+    (entries, None, Seq.empty, None)
+  }
+
+  /**
+   * Create a [[ScriptInputOutputSchema]].
+   */
+  protected def withScriptIOSchema(
+                                    ctx: ParserRuleContext,
+                                    inRowFormat: RowFormatContext,
+                                    recordWriter: Token,
+                                    outRowFormat: RowFormatContext,
+                                    recordReader: Token,
+                                    schemaLess: Boolean): 
ScriptInputOutputSchema = {
+
+    def format(fmt: RowFormatContext): ScriptIOFormat = fmt match {
+      case c: RowFormatDelimitedContext =>
+        getRowFormatDelimited(c)
+
+      case c: RowFormatSerdeContext =>
+        throw new ParseException("TRANSFORM with serde is only supported in 
hive mode", ctx)
+
+      // SPARK-32106: When there is no definition about format, we return 
empty result
+      // to use a built-in default Serde in SparkScriptTransformationExec.
+      case null =>
+        (Nil, None, Seq.empty, None)
+    }
+
+    val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat)
+
+    val (outFormat, outSerdeClass, outSerdeProps, writer) = 
format(outRowFormat)
+
+    ScriptInputOutputSchema(
+      inFormat, outFormat,
+      inSerdeClass, outSerdeClass,
+      inSerdeProps, outSerdeProps,
+      reader, writer,
+      schemaLess)
+  }
+
+  /**
+   * Create a logical plan for a given 'FROM' clause. Note that we support 
multiple (comma
+   * separated) relations here, these get converted into a single plan by 
condition-less inner join.
+   */
+  override def visitFromClause(ctx: FromClauseContext): LogicalPlan = 
withOrigin(ctx) {
+    val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, 
relation) =>
+      val right = plan(relation.relationPrimary)
+      val join = right.optionalMap(left) { (left, right) =>
+        if (relation.LATERAL != null) {
+          if (!relation.relationPrimary.isInstanceOf[AliasedQueryContext]) {
+            throw new ParseException(s"LATERAL can only be used with 
subquery", relation.relationPrimary)
+          }
+          LateralJoin(left, LateralSubquery(right), Inner, None)
+        } else {
+          Join(left, right, Inner, None, JoinHint.NONE)
+        }
+      }
+      withJoinRelations(join, relation)
+    }
+    if (ctx.pivotClause() != null) {
+      if (!ctx.lateralView.isEmpty) {
+        throw new ParseException("LATERAL cannot be used together with PIVOT 
in FROM clause", ctx)
+      }
+      withPivot(ctx.pivotClause, from)
+    } else {
+      ctx.lateralView.asScala.foldLeft(from)(withGenerate)
+    }
+  }
+
+  /**
+   * Connect two queries by a Set operator.
+   *
+   * Supported Set operators are:
+   * - UNION [ DISTINCT | ALL ]
+   * - EXCEPT [ DISTINCT | ALL ]
+   * - MINUS [ DISTINCT | ALL ]
+   * - INTERSECT [DISTINCT | ALL]
+   */
+  override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = 
withOrigin(ctx) {
+    val left = plan(ctx.left)
+    val right = plan(ctx.right)
+    val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
+    ctx.operator.getType match {
+      case HoodieSqlBaseParser.UNION if all =>
+        Union(left, right)
+      case HoodieSqlBaseParser.UNION =>
+        Distinct(Union(left, right))
+      case HoodieSqlBaseParser.INTERSECT if all =>
+        Intersect(left, right, isAll = true)
+      case HoodieSqlBaseParser.INTERSECT =>
+        Intersect(left, right, isAll = false)
+      case HoodieSqlBaseParser.EXCEPT if all =>
+        Except(left, right, isAll = true)
+      case HoodieSqlBaseParser.EXCEPT =>
+        Except(left, right, isAll = false)
+      case HoodieSqlBaseParser.SETMINUS if all =>
+        Except(left, right, isAll = true)
+      case HoodieSqlBaseParser.SETMINUS =>
+        Except(left, right, isAll = false)
+    }
+  }
+
+  /**
+   * Add a [[WithWindowDefinition]] operator to a logical plan.
+   */
+  private def withWindowClause(
+                                ctx: WindowClauseContext,
+                                query: LogicalPlan): LogicalPlan = 
withOrigin(ctx) {
+    // Collect all window specifications defined in the WINDOW clause.
+    val baseWindowTuples = ctx.namedWindow.asScala.map {
+      wCtx =>
+        (wCtx.name.getText, typedVisit[WindowSpec](wCtx.windowSpec))
+    }
+    baseWindowTuples.groupBy(_._1).foreach { kv =>
+      if (kv._2.size > 1) {
+        throw new ParseException(s"The definition of window '${kv._1}' is 
repetitive", ctx)
+      }
+    }
+    val baseWindowMap = baseWindowTuples.toMap
+
+    // Handle cases like
+    // window w1 as (partition by p_mfgr order by p_name
+    //               range between 2 preceding and 2 following),
+    //        w2 as w1
+    val windowMapView = baseWindowMap.mapValues {
+      case WindowSpecReference(name) =>
+        baseWindowMap.get(name) match {
+          case Some(spec: WindowSpecDefinition) =>
+            spec
+          case Some(ref) =>
+            throw new ParseException(s"Window reference '$name' is not a 
window specification", ctx)
+          case None =>
+            throw new ParseException(s"Cannot resolve window reference 
'$name'", ctx)
+        }
+      case spec: WindowSpecDefinition => spec
+    }
+
+    // Note that mapValues creates a view instead of materialized map. We 
force materialization by
+    // mapping over identity.
+    WithWindowDefinition(windowMapView.map(identity).toMap, query, forPipeSQL 
= false)
+  }
+
+  /**
+   * Add an [[Aggregate]] to a logical plan.
+   */
+  private def withAggregationClause(
+                                     ctx: AggregationClauseContext,
+                                     selectExpressions: Seq[NamedExpression],
+                                     query: LogicalPlan): LogicalPlan = 
withOrigin(ctx) {
+    if (ctx.groupingExpressionsWithGroupingAnalytics.isEmpty) {
+      val groupByExpressions = expressionList(ctx.groupingExpressions)
+      if (ctx.GROUPING != null) {
+        // GROUP BY ... GROUPING SETS (...)
+        // `groupByExpressions` can be non-empty for Hive compatibility. It 
may add extra grouping
+        // expressions that do not exist in GROUPING SETS (...), and the value 
is always null.
+        // For example, `SELECT a, b, c FROM ... GROUP BY a, b, c GROUPING 
SETS (a, b)`, the output
+        // of column `c` is always null.
+        val groupingSets =
+        ctx.groupingSet.asScala.map(_.expression.asScala.map(e => 
expression(e)).toSeq)
+        Aggregate(Seq(GroupingSets(groupingSets.toSeq, groupByExpressions)),
+          selectExpressions, query)
+      } else {
+        // GROUP BY .... (WITH CUBE | WITH ROLLUP)?
+        val mappedGroupByExpressions = if (ctx.CUBE != null) {
+          Seq(Cube(groupByExpressions.map(Seq(_))))
+        } else if (ctx.ROLLUP != null) {
+          Seq(Rollup(groupByExpressions.map(Seq(_))))
+        } else {
+          groupByExpressions
+        }
+        Aggregate(mappedGroupByExpressions, selectExpressions, query)
+      }
+    } else {
+      val groupByExpressions =
+        ctx.groupingExpressionsWithGroupingAnalytics.asScala
+          .map(groupByExpr => {
+            val groupingAnalytics = groupByExpr.groupingAnalytics
+            if (groupingAnalytics != null) {
+              visitGroupingAnalytics(groupingAnalytics)
+            } else {
+              expression(groupByExpr.expression)
+            }
+          })
+      Aggregate(groupByExpressions.toSeq, selectExpressions, query)
+    }
+  }
+
+  override def visitGroupingAnalytics(
+                                       groupingAnalytics: 
GroupingAnalyticsContext): BaseGroupingSets = {
+    val groupingSets = groupingAnalytics.groupingSet.asScala
+      .map(_.expression.asScala.map(e => expression(e)).toSeq)
+    if (groupingAnalytics.CUBE != null) {
+      // CUBE(A, B, (A, B), ()) is not supported.
+      if (groupingSets.exists(_.isEmpty)) {
+        throw new ParseException(s"Empty set in CUBE grouping sets is not 
supported.", groupingAnalytics)
+      }
+      Cube(groupingSets.toSeq)
+    } else if (groupingAnalytics.ROLLUP != null) {
+      // ROLLUP(A, B, (A, B), ()) is not supported.
+      if (groupingSets.exists(_.isEmpty)) {
+        throw new ParseException(s"Empty set in ROLLUP grouping sets is not 
supported.", groupingAnalytics)
+      }
+      Rollup(groupingSets.toSeq)
+    } else {
+      assert(groupingAnalytics.GROUPING != null && groupingAnalytics.SETS != 
null)
+      val groupingSets = groupingAnalytics.groupingElement.asScala.flatMap { 
expr =>
+        val groupingAnalytics = expr.groupingAnalytics()
+        if (groupingAnalytics != null) {
+          visitGroupingAnalytics(groupingAnalytics).selectedGroupByExprs
+        } else {
+          Seq(expr.groupingSet().expression().asScala.map(e => 
expression(e)).toSeq)
+        }
+      }
+      GroupingSets(groupingSets.toSeq)
+    }
+  }
+
+  /**
+   * Add [[UnresolvedHint]]s to a logical plan.
+   */
+  private def withHints(
+                         ctx: HintContext,
+                         query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    var plan = query
+    ctx.hintStatements.asScala.reverse.foreach { stmt =>
+      plan = UnresolvedHint(stmt.hintName.getText,
+        stmt.parameters.asScala.map(expression).toSeq, plan)
+    }
+    plan
+  }
+
+  /**
+   * Add a [[Pivot]] to a logical plan.
+   */
+  private def withPivot(
+                         ctx: PivotClauseContext,
+                         query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    val aggregates = Option(ctx.aggregates).toSeq
+      .flatMap(_.namedExpression.asScala)
+      .map(typedVisit[Expression])
+    val pivotColumn = if (ctx.pivotColumn.identifiers.size == 1) {
+      UnresolvedAttribute.quoted(ctx.pivotColumn.identifier.getText)
+    } else {
+      CreateStruct(
+        ctx.pivotColumn.identifiers.asScala.map(
+          identifier => UnresolvedAttribute.quoted(identifier.getText)).toSeq)
+    }
+    val pivotValues = ctx.pivotValues.asScala.map(visitPivotValue)
+    Pivot(None, pivotColumn, pivotValues.toSeq, aggregates, query)
+  }
+
+  /**
+   * Create a Pivot column value with or without an alias.
+   */
+  override def visitPivotValue(ctx: PivotValueContext): Expression = 
withOrigin(ctx) {
+    val e = expression(ctx.expression)
+    if (ctx.identifier != null) {
+      Alias(e, ctx.identifier.getText)()
+    } else {
+      e
+    }
+  }
+
+  /**
+   * Add a [[Generate]] (Lateral View) to a logical plan.
+   */
+  private def withGenerate(
+                            query: LogicalPlan,
+                            ctx: LateralViewContext): LogicalPlan = 
withOrigin(ctx) {
+    val expressions = expressionList(ctx.expression)
+    Generate(
+      UnresolvedGenerator(visitFunctionName(ctx.qualifiedName), expressions),
+      unrequiredChildIndex = Nil,
+      outer = ctx.OUTER != null,
+      // scalastyle:off caselocale
+      Some(ctx.tblName.getText.toLowerCase),
+      // scalastyle:on caselocale
+      ctx.colName.asScala.map(_.getText).map(UnresolvedAttribute.quoted).toSeq,
+      query)
+  }
+
+  /**
+   * Create a single relation referenced in a FROM clause. This method is used 
when a part of the
+   * join condition is nested, for example:
+   * {{{
+   *   select * from t1 join (t2 cross join t3) on col1 = col2
+   * }}}
+   */
+  override def visitRelation(ctx: RelationContext): LogicalPlan = 
withOrigin(ctx) {
+    withJoinRelations(plan(ctx.relationPrimary), ctx)
+  }
+
+  /**
+   * Join one more [[LogicalPlan]]s to the current logical plan.
+   */
+  private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): 
LogicalPlan = {
+    ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
+      withOrigin(join) {
+        val baseJoinType = join.joinType match {
+          case null => Inner
+          case jt if jt.CROSS != null => Cross
+          case jt if jt.FULL != null => FullOuter
+          case jt if jt.SEMI != null => LeftSemi
+          case jt if jt.ANTI != null => LeftAnti
+          case jt if jt.LEFT != null => LeftOuter
+          case jt if jt.RIGHT != null => RightOuter
+          case _ => Inner
+        }
+
+        if (join.LATERAL != null && 
!join.right.isInstanceOf[AliasedQueryContext]) {
+          throw new ParseException(s"LATERAL can only be used with subquery", 
join.right)
+        }
+
+        // Resolve the join type and join condition
+        val (joinType, condition) = Option(join.joinCriteria) match {
+          case Some(c) if c.USING != null =>
+            if (join.LATERAL != null) {
+              throw new ParseException("LATERAL join with USING join is not 
supported", ctx)
+            }
+            (UsingJoin(baseJoinType, visitIdentifierList(c.identifierList)), 
None)
+          case Some(c) if c.booleanExpression != null =>
+            (baseJoinType, Option(expression(c.booleanExpression)))
+          case Some(c) =>
+            throw new ParseException(s"Unimplemented joinCriteria: $c", ctx)
+          case None if join.NATURAL != null =>
+            if (join.LATERAL != null) {
+              throw new ParseException("LATERAL join with NATURAL join is not 
supported", ctx)
+            }
+            if (baseJoinType == Cross) {
+              throw new ParseException("NATURAL CROSS JOIN is not supported", 
ctx)
+            }
+            (NaturalJoin(baseJoinType), None)
+          case None =>
+            (baseJoinType, None)
+        }
+        if (join.LATERAL != null) {
+          if (!Seq(Inner, Cross, LeftOuter).contains(joinType)) {
+            throw new ParseException(s"Unsupported LATERAL join type 
${joinType.toString}", ctx)
+          }
+          LateralJoin(left, LateralSubquery(plan(join.right)), joinType, 
condition)
+        } else {
+          Join(left, plan(join.right), joinType, condition, JoinHint.NONE)
+        }
+      }
+    }
+  }
+
+  /**
+   * Add a [[Sample]] to a logical plan.
+   *
+   * This currently supports the following sampling methods:
+   * - TABLESAMPLE(x ROWS): Sample the table down to the given number of rows.
+   * - TABLESAMPLE(x PERCENT): Sample the table down to the given percentage. 
Note that percentages
+   * are defined as a number between 0 and 100.
+   * - TABLESAMPLE(BUCKET x OUT OF y): Sample the table down to a 'x' divided 
by 'y' fraction.
+   */
+  private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan 
= withOrigin(ctx) {
+    // Create a sampled plan if we need one.
+    def sample(fraction: Double): Sample = {
+      // The range of fraction accepted by Sample is [0, 1]. Because Hive's 
block sampling
+      // function takes X PERCENT as the input and the range of X is [0, 100], 
we need to
+      // adjust the fraction.
+      val eps = RandomSampler.roundingEpsilon
+      validate(fraction >= 0.0 - eps && fraction <= 1.0 + eps,
+        s"Sampling fraction ($fraction) must be on interval [0, 1]",
+        ctx)
+      Sample(0.0, fraction, withReplacement = false, (math.random * 
1000).toInt, query)
+    }
+
+    if (ctx.sampleMethod() == null) {
+      throw new ParseException("TABLESAMPLE does not accept empty inputs.", 
ctx)
+    }
+
+    ctx.sampleMethod() match {
+      case ctx: SampleByRowsContext =>
+        Limit(expression(ctx.expression), query)
+
+      case ctx: SampleByPercentileContext =>
+        val fraction = ctx.percentage.getText.toDouble
+        val sign = if (ctx.negativeSign == null) 1 else -1
+        sample(sign * fraction / 100.0d)
+
+      case ctx: SampleByBytesContext =>
+        val bytesStr = ctx.bytes.getText
+        if (bytesStr.matches("[0-9]+[bBkKmMgG]")) {
+          throw new ParseException(s"TABLESAMPLE(byteLengthLiteral) is not 
supported", ctx)
+        } else {
+          throw new ParseException(s"$bytesStr is not a valid byte length 
literal, " +
+            "expected syntax: DIGIT+ ('B' | 'K' | 'M' | 'G')", ctx)
+        }
+
+      case ctx: SampleByBucketContext if ctx.ON() != null =>
+        if (ctx.identifier != null) {
+          throw new ParseException(s"TABLESAMPLE(BUCKET x OUT OF y ON colname) 
is not supported", ctx)
+        } else {
+          throw new ParseException(s"TABLESAMPLE(BUCKET x OUT OF y ON 
function) is not supported", ctx)
+        }
+
+      case ctx: SampleByBucketContext =>
+        sample(ctx.numerator.getText.toDouble / 
ctx.denominator.getText.toDouble)
+    }
+  }
+
+  /**
+   * Create a logical plan for a sub-query.
+   */
+  override def visitSubquery(ctx: SubqueryContext): LogicalPlan = 
withOrigin(ctx) {
+    plan(ctx.query)
+  }
+
+  /**
+   * Create an un-aliased table reference. This is typically used for 
top-level table references,
+   * for example:
+   * {{{
+   *   INSERT INTO db.tbl2
+   *   TABLE db.tbl1
+   * }}}
+   */
+  override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
+    UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier))
+  }
+
+  /**
+   * Create a table-valued function call with arguments, e.g. range(1000)
+   */
+  override def visitTableValuedFunction(ctx: TableValuedFunctionContext)
+  : LogicalPlan = withOrigin(ctx) {
+    val func = ctx.functionTable
+    val aliases = if (func.tableAlias.identifierList != null) {
+      visitIdentifierList(func.tableAlias.identifierList)
+    } else {
+      Seq.empty
+    }
+    val name = getFunctionIdentifier(func.functionName)
+    if (name.database.nonEmpty) {
+      operationNotAllowed(s"table valued function cannot specify database 
name: $name", ctx)
+    }
+
+    val tvf = UnresolvedTableValuedFunction(name, 
func.expression.asScala.map(expression).toSeq)
+
+    val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(name, tvf, 
aliases) else tvf
+
+    tvfAliases.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
+  }
+
+  /**
+   * Create an inline table (a virtual table in Hive parlance).
+   */
+  override def visitInlineTable(ctx: InlineTableContext): LogicalPlan = 
withOrigin(ctx) {
+    // Get the backing expressions.
+    val rows = ctx.expression.asScala.map { e =>
+      expression(e) match {
+        // inline table comes in two styles:
+        // style 1: values (1), (2), (3)  -- multiple columns are supported
+        // style 2: values 1, 2, 3  -- only a single column is supported here
+        case struct: CreateNamedStruct => struct.valExprs // style 1
+        case child => Seq(child) // style 2
+      }
+    }
+
+    val aliases = if (ctx.tableAlias.identifierList != null) {
+      visitIdentifierList(ctx.tableAlias.identifierList)
+    } else {
+      Seq.tabulate(rows.head.size)(i => s"col${i + 1}")
+    }
+
+    val table = UnresolvedInlineTable(aliases, rows.toSeq)
+    table.optionalMap(ctx.tableAlias.strictIdentifier)(aliasPlan)
+  }
+
+  /**
+   * Create an alias (SubqueryAlias) for a join relation. This is practically 
the same as
+   * visitAliasedQuery and visitNamedExpression, ANTLR4 however requires us to 
use 3 different
+   * hooks. We could add alias names for output columns, for example:
+   * {{{
+   *   SELECT a, b, c, d FROM (src1 s1 INNER JOIN src2 s2 ON s1.id = s2.id) 
dst(a, b, c, d)
+   * }}}
+   */
+  override def visitAliasedRelation(ctx: AliasedRelationContext): LogicalPlan 
= withOrigin(ctx) {
+    val relation = plan(ctx.relation).optionalMap(ctx.sample)(withSample)
+    mayApplyAliasPlan(ctx.tableAlias, relation)
+  }
+
+  /**
+   * Create an alias (SubqueryAlias) for a sub-query. This is practically the 
same as
+   * visitAliasedRelation and visitNamedExpression, ANTLR4 however requires us 
to use 3 different
+   * hooks. We could add alias names for output columns, for example:
+   * {{{
+   *   SELECT col1, col2 FROM testData AS t(col1, col2)
+   * }}}
+   */
+  override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = 
withOrigin(ctx) {
+    val relation = plan(ctx.query).optionalMap(ctx.sample)(withSample)
+    if (ctx.tableAlias.strictIdentifier == null) {
+      // For un-aliased subqueries, use a default alias name that is not 
likely to conflict with
+      // normal subquery names, so that parent operators can only access the 
columns in subquery by
+      // unqualified names. Users can still use this special qualifier to 
access columns if they
+      // know it, but that's not recommended.
+      SubqueryAlias("__auto_generated_subquery_name", relation)
+    } else {
+      mayApplyAliasPlan(ctx.tableAlias, relation)
+    }
+  }
+
+  /**
+   * Create an alias ([[SubqueryAlias]]) for a [[LogicalPlan]].
+   */
+  private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): 
LogicalPlan = {
+    SubqueryAlias(alias.getText, plan)
+  }
+
+  /**
+   * If aliases specified in a FROM clause, create a subquery alias 
([[SubqueryAlias]]) and
+   * column aliases for a [[LogicalPlan]].
+   */
+  private def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: 
LogicalPlan): LogicalPlan = {
+    if (tableAlias.strictIdentifier != null) {
+      val alias = tableAlias.strictIdentifier.getText
+      if (tableAlias.identifierList != null) {
+        val columnNames = visitIdentifierList(tableAlias.identifierList)
+        SubqueryAlias(alias, UnresolvedSubqueryColumnAliases(columnNames, 
plan))
+      } else {
+        SubqueryAlias(alias, plan)
+      }
+    } else {
+      plan
+    }
+  }
+
+  /**
+   * Create a Sequence of Strings for a parenthesis enclosed alias list.
+   */
+  override def visitIdentifierList(ctx: IdentifierListContext): Seq[String] = 
withOrigin(ctx) {
+    visitIdentifierSeq(ctx.identifierSeq)
+  }
+
+  /**
+   * Create a Sequence of Strings for an identifier list.
+   */
+  override def visitIdentifierSeq(ctx: IdentifierSeqContext): Seq[String] = 
withOrigin(ctx) {
+    ctx.ident.asScala.map(_.getText).toSeq
+  }
+
+  /* 
********************************************************************************************
+   * Table Identifier parsing
+   * 
********************************************************************************************
 */
+
+  /**
+   * Create a [[TableIdentifier]] from a 'tableName' or 
'databaseName'.'tableName' pattern.
+   */
+  override def visitTableIdentifier(
+                                     ctx: TableIdentifierContext): 
TableIdentifier = withOrigin(ctx) {
+    TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))
+  }
+
+  /**
+   * Create a [[FunctionIdentifier]] from a 'functionName' or 
'databaseName'.'functionName' pattern.
+   */
+  override def visitFunctionIdentifier(
+                                        ctx: FunctionIdentifierContext): 
FunctionIdentifier = withOrigin(ctx) {
+    FunctionIdentifier(ctx.function.getText, Option(ctx.db).map(_.getText))
+  }
+
+  /**
+   * Create a multi-part identifier.
+   */
+  override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): 
Seq[String] =
+    withOrigin(ctx) {
+      ctx.parts.asScala.map(_.getText).toSeq
+    }
+
+  /* 
********************************************************************************************
+   * Expression parsing
+   * 
********************************************************************************************
 */
+
+  /**
+   * Create an expression from the given context. This method just passes the 
context on to the
+   * visitor and only takes care of typing (We assume that the visitor returns 
an Expression here).
+   */
+  protected def expression(ctx: ParserRuleContext): Expression = 
typedVisit(ctx)
+
+  /**
+   * Create sequence of expressions from the given sequence of contexts.
+   */
+  private def expressionList(trees: java.util.List[ExpressionContext]): 
Seq[Expression] = {
+    trees.asScala.map(expression).toSeq
+  }
+
+  /**
+   * Create a star (i.e. all) expression; this selects all elements (in the 
specified object).
+   * Both un-targeted (global) and targeted aliases are supported.
+   */
+  override def visitStar(ctx: StarContext): Expression = withOrigin(ctx) {
+    
UnresolvedStar(Option(ctx.qualifiedName()).map(_.identifier.asScala.map(_.getText).toSeq))
+  }
+
+  /**
+   * Create an aliased expression if an alias is specified. Both single and 
multi-aliases are
+   * supported.
+   */
+  override def visitNamedExpression(ctx: NamedExpressionContext): Expression = 
withOrigin(ctx) {
+    val e = expression(ctx.expression)
+    if (ctx.name != null) {
+      Alias(e, ctx.name.getText)()
+    } else if (ctx.identifierList != null) {
+      MultiAlias(e, visitIdentifierList(ctx.identifierList))
+    } else {
+      e
+    }
+  }
+
+  /**
+   * Combine a number of boolean expressions into a balanced expression tree. 
These expressions are
+   * either combined by a logical [[And]] or a logical [[Or]].
+   *
+   * A balanced binary tree is created because regular left recursive trees 
cause considerable
+   * performance degradations and can cause stack overflows.
+   */
+  override def visitLogicalBinary(ctx: LogicalBinaryContext): Expression = 
withOrigin(ctx) {
+    val expressionType = ctx.operator.getType
+    val expressionCombiner = expressionType match {
+      case HoodieSqlBaseParser.AND => And.apply _
+      case HoodieSqlBaseParser.OR => Or.apply _
+    }
+
+    // Collect all similar left hand contexts.
+    val contexts = ArrayBuffer(ctx.right)
+    var current = ctx.left
+
+    def collectContexts: Boolean = current match {
+      case lbc: LogicalBinaryContext if lbc.operator.getType == expressionType 
=>
+        contexts += lbc.right
+        current = lbc.left
+        true
+      case _ =>
+        contexts += current
+        false
+    }
+
+    while (collectContexts) {
+      // No body - all updates take place in the collectContexts.
+    }
+
+    // Reverse the contexts to have them in the same sequence as in the SQL 
statement & turn them
+    // into expressions.
+    val expressions = contexts.reverseMap(expression)
+
+    // Create a balanced tree.
+    def reduceToExpressionTree(low: Int, high: Int): Expression = high - low 
match {
+      case 0 =>
+        expressions(low)
+      case 1 =>
+        expressionCombiner(expressions(low), expressions(high))
+      case x =>
+        val mid = low + x / 2
+        expressionCombiner(
+          reduceToExpressionTree(low, mid),
+          reduceToExpressionTree(mid + 1, high))
+    }
+
+    reduceToExpressionTree(0, expressions.size - 1)
+  }
+
+  /**
+   * Invert a boolean expression.
+   */
+  override def visitLogicalNot(ctx: LogicalNotContext): Expression = 
withOrigin(ctx) {
+    Not(expression(ctx.booleanExpression()))
+  }
+
+  /**
+   * Create a filtering correlated sub-query (EXISTS).
+   */
+  override def visitExists(ctx: ExistsContext): Expression = {
+    Exists(plan(ctx.query))
+  }
+
+  /**
+   * Create a comparison expression. This compares two expressions. The 
following comparison
+   * operators are supported:
+   * - Equal: '=' or '=='
+   * - Null-safe Equal: '<=>'
+   * - Not Equal: '<>' or '!='
+   * - Less than: '<'
+   * - Less then or Equal: '<='
+   * - Greater than: '>'
+   * - Greater then or Equal: '>='
+   */
+  override def visitComparison(ctx: ComparisonContext): Expression = 
withOrigin(ctx) {
+    val left = expression(ctx.left)
+    val right = expression(ctx.right)
+    val operator = 
ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
+    operator.getSymbol.getType match {
+      case HoodieSqlBaseParser.EQ =>
+        EqualTo(left, right)
+      case HoodieSqlBaseParser.NSEQ =>
+        EqualNullSafe(left, right)
+      case HoodieSqlBaseParser.NEQ | HoodieSqlBaseParser.NEQJ =>
+        Not(EqualTo(left, right))
+      case HoodieSqlBaseParser.LT =>
+        LessThan(left, right)
+      case HoodieSqlBaseParser.LTE =>
+        LessThanOrEqual(left, right)
+      case HoodieSqlBaseParser.GT =>
+        GreaterThan(left, right)
+      case HoodieSqlBaseParser.GTE =>
+        GreaterThanOrEqual(left, right)
+    }
+  }
+
+  /**
+   * Create a predicated expression. A predicated expression is a normal 
expression with a
+   * predicate attached to it, for example:
+   * {{{
+   *    a + 1 IS NULL
+   * }}}
+   */
+  override def visitPredicated(ctx: PredicatedContext): Expression = 
withOrigin(ctx) {
+    val e = expression(ctx.valueExpression)
+    if (ctx.predicate != null) {
+      withPredicate(e, ctx.predicate)
+    } else {
+      e
+    }
+  }
+
+  /**
+   * Add a predicate to the given expression. Supported expressions are:
+   * - (NOT) BETWEEN
+   * - (NOT) IN
+   * - (NOT) LIKE (ANY | SOME | ALL)
+   * - (NOT) RLIKE
+   * - IS (NOT) NULL.
+   * - IS (NOT) (TRUE | FALSE | UNKNOWN)
+   * - IS (NOT) DISTINCT FROM
+   */
+  private def withPredicate(e: Expression, ctx: PredicateContext): Expression 
= withOrigin(ctx) {
+    // Invert a predicate if it has a valid NOT clause.
+    def invertIfNotDefined(e: Expression): Expression = ctx.NOT match {
+      case null => e
+      case not => Not(e)
+    }
+
+    def getValueExpressions(e: Expression): Seq[Expression] = e match {
+      case c: CreateNamedStruct => c.valExprs
+      case other => Seq(other)
+    }
+
+    // Create the predicate.
+    ctx.kind.getType match {
+      case HoodieSqlBaseParser.BETWEEN =>
+        // BETWEEN is translated to lower <= e && e <= upper
+        invertIfNotDefined(And(
+          GreaterThanOrEqual(e, expression(ctx.lower)),
+          LessThanOrEqual(e, expression(ctx.upper))))
+      case HoodieSqlBaseParser.IN if ctx.query != null =>
+        invertIfNotDefined(InSubquery(getValueExpressions(e), 
ListQuery(plan(ctx.query))))
+      case HoodieSqlBaseParser.IN =>
+        invertIfNotDefined(In(e, ctx.expression.asScala.map(expression).toSeq))
+      case HoodieSqlBaseParser.LIKE =>
+        Option(ctx.quantifier).map(_.getType) match {
+          case Some(HoodieSqlBaseParser.ANY) | Some(HoodieSqlBaseParser.SOME) 
=>
+            validate(!ctx.expression.isEmpty, "Expected something between '(' 
and ')'.", ctx)
+            val expressions = expressionList(ctx.expression)
+            if (expressions.forall(_.foldable) && 
expressions.forall(_.dataType == StringType)) {
+              // If there are many pattern expressions, will throw 
StackOverflowError.
+              // So we use LikeAny or NotLikeAny instead.
+              val patterns = 
expressions.map(_.eval(EmptyRow).asInstanceOf[UTF8String])
+              ctx.NOT match {
+                case null => LikeAny(e, patterns)
+                case _ => NotLikeAny(e, patterns)
+              }
+            } else {
+              ctx.expression.asScala.map(expression)
+                .map(p => invertIfNotDefined(new Like(e, 
p))).toSeq.reduceLeft(Or)
+            }
+          case Some(HoodieSqlBaseParser.ALL) =>
+            validate(!ctx.expression.isEmpty, "Expected something between '(' 
and ')'.", ctx)
+            val expressions = expressionList(ctx.expression)
+            if (expressions.forall(_.foldable) && 
expressions.forall(_.dataType == StringType)) {
+              // If there are many pattern expressions, will throw 
StackOverflowError.
+              // So we use LikeAll or NotLikeAll instead.
+              val patterns = 
expressions.map(_.eval(EmptyRow).asInstanceOf[UTF8String])
+              ctx.NOT match {
+                case null => LikeAll(e, patterns)
+                case _ => NotLikeAll(e, patterns)
+              }
+            } else {
+              ctx.expression.asScala.map(expression)
+                .map(p => invertIfNotDefined(new Like(e, 
p))).toSeq.reduceLeft(And)
+            }
+          case _ =>
+            val escapeChar = Option(ctx.escapeChar).map(string).map { str =>
+              if (str.length != 1) {
+                throw new ParseException("Invalid escape string. Escape string 
must contain only one character.", ctx)
+              }
+              str.charAt(0)
+            }.getOrElse('\\')
+            invertIfNotDefined(Like(e, expression(ctx.pattern), escapeChar))
+        }
+      case HoodieSqlBaseParser.RLIKE =>
+        invertIfNotDefined(RLike(e, expression(ctx.pattern)))
+      case HoodieSqlBaseParser.NULL if ctx.NOT != null =>
+        IsNotNull(e)
+      case HoodieSqlBaseParser.NULL =>
+        IsNull(e)
+      case HoodieSqlBaseParser.TRUE => ctx.NOT match {
+        case null => EqualNullSafe(e, Literal(true))
+        case _ => Not(EqualNullSafe(e, Literal(true)))
+      }
+      case HoodieSqlBaseParser.FALSE => ctx.NOT match {
+        case null => EqualNullSafe(e, Literal(false))
+        case _ => Not(EqualNullSafe(e, Literal(false)))
+      }
+      case HoodieSqlBaseParser.UNKNOWN => ctx.NOT match {
+        case null => IsUnknown(e)
+        case _ => IsNotUnknown(e)
+      }
+      case HoodieSqlBaseParser.DISTINCT if ctx.NOT != null =>
+        EqualNullSafe(e, expression(ctx.right))
+      case HoodieSqlBaseParser.DISTINCT =>
+        Not(EqualNullSafe(e, expression(ctx.right)))
+    }
+  }
+
+  /**
+   * Create a binary arithmetic expression. The following arithmetic operators 
are supported:
+   * - Multiplication: '*'
+   * - Division: '/'
+   * - Hive Long Division: 'DIV'
+   * - Modulo: '%'
+   * - Addition: '+'
+   * - Subtraction: '-'
+   * - Binary AND: '&'
+   * - Binary XOR
+   * - Binary OR: '|'
+   */
+  override def visitArithmeticBinary(ctx: ArithmeticBinaryContext): Expression 
= withOrigin(ctx) {
+    val left = expression(ctx.left)
+    val right = expression(ctx.right)
+    ctx.operator.getType match {
+      case HoodieSqlBaseParser.ASTERISK =>
+        Multiply(left, right)
+      case HoodieSqlBaseParser.SLASH =>
+        Divide(left, right)
+      case HoodieSqlBaseParser.PERCENT =>
+        Remainder(left, right)
+      case HoodieSqlBaseParser.DIV =>
+        IntegralDivide(left, right)
+      case HoodieSqlBaseParser.PLUS =>
+        Add(left, right)
+      case HoodieSqlBaseParser.MINUS =>
+        Subtract(left, right)
+      case HoodieSqlBaseParser.CONCAT_PIPE =>
+        Concat(left :: right :: Nil)
+      case HoodieSqlBaseParser.AMPERSAND =>
+        BitwiseAnd(left, right)
+      case HoodieSqlBaseParser.HAT =>
+        BitwiseXor(left, right)
+      case HoodieSqlBaseParser.PIPE =>
+        BitwiseOr(left, right)
+    }
+  }
+
+  /**
+   * Create a unary arithmetic expression. The following arithmetic operators 
are supported:
+   * - Plus: '+'
+   * - Minus: '-'
+   * - Bitwise Not: '~'
+   */
+  override def visitArithmeticUnary(ctx: ArithmeticUnaryContext): Expression = 
withOrigin(ctx) {
+    val value = expression(ctx.valueExpression)
+    ctx.operator.getType match {
+      case HoodieSqlBaseParser.PLUS =>
+        UnaryPositive(value)
+      case HoodieSqlBaseParser.MINUS =>
+        UnaryMinus(value)
+      case HoodieSqlBaseParser.TILDE =>
+        BitwiseNot(value)
+    }
+  }
+
+  override def visitCurrentLike(ctx: CurrentLikeContext): Expression = 
withOrigin(ctx) {
+    if (conf.ansiEnabled) {
+      ctx.name.getType match {
+        case HoodieSqlBaseParser.CURRENT_DATE =>
+          CurrentDate()
+        case HoodieSqlBaseParser.CURRENT_TIMESTAMP =>
+          CurrentTimestamp()
+        case HoodieSqlBaseParser.CURRENT_USER =>
+          CurrentUser()
+      }
+    } else {
+      // If the parser is not in ansi mode, we should return 
`UnresolvedAttribute`, in case there
+      // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP`.
+      UnresolvedAttribute.quoted(ctx.name.getText)
+    }
+  }
+
+  /**
+   * Create a [[Cast]] expression.
+   */
+  override def visitCast(ctx: CastContext): Expression = withOrigin(ctx) {
+    val rawDataType = typedVisit[DataType](ctx.dataType())
+    val dataType = 
CharVarcharUtils.replaceCharVarcharWithStringForCast(rawDataType)
+    val cast = ctx.name.getType match {
+      case HoodieSqlBaseParser.CAST =>
+        Cast(expression(ctx.expression), dataType)
+
+      case HoodieSqlBaseParser.TRY_CAST =>
+        Cast(expression(ctx.expression), dataType, evalMode = EvalMode.TRY)
+    }
+    cast.setTagValue(Cast.USER_SPECIFIED_CAST, ())
+    cast
+  }
+
+  /**
+   * Create a [[CreateStruct]] expression.
+   */
+  override def visitStruct(ctx: StructContext): Expression = withOrigin(ctx) {
+    CreateStruct.create(ctx.argument.asScala.map(expression).toSeq)
+  }
+
+  /**
+   * Create a [[First]] expression.
+   */
+  override def visitFirst(ctx: FirstContext): Expression = withOrigin(ctx) {
+    val ignoreNullsExpr = ctx.IGNORE != null
+    First(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
+  }
+
+  /**
+   * Create a [[Last]] expression.
+   */
+  override def visitLast(ctx: LastContext): Expression = withOrigin(ctx) {
+    val ignoreNullsExpr = ctx.IGNORE != null
+    Last(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
+  }
+
+  /**
+   * Create a Position expression.
+   */
+  override def visitPosition(ctx: PositionContext): Expression = 
withOrigin(ctx) {
+    new StringLocate(expression(ctx.substr), expression(ctx.str))
+  }
+
+  /**
+   * Create a Extract expression.
+   */
+  override def visitExtract(ctx: ExtractContext): Expression = withOrigin(ctx) 
{
+    val arguments = Seq(Literal(ctx.field.getText), expression(ctx.source))
+    UnresolvedFunction("extract", arguments, isDistinct = false)
+  }
+
+  /**
+   * Create a Substring/Substr expression.
+   */
+  override def visitSubstring(ctx: SubstringContext): Expression = 
withOrigin(ctx) {
+    if (ctx.len != null) {
+      Substring(expression(ctx.str), expression(ctx.pos), expression(ctx.len))
+    } else {
+      new Substring(expression(ctx.str), expression(ctx.pos))
+    }
+  }
+
+  /**
+   * Create a Trim expression.
+   */
+  override def visitTrim(ctx: TrimContext): Expression = withOrigin(ctx) {
+    val srcStr = expression(ctx.srcStr)
+    val trimStr = Option(ctx.trimStr).map(expression)
+    Option(ctx.trimOption).map(_.getType).getOrElse(HoodieSqlBaseParser.BOTH) 
match {
+      case HoodieSqlBaseParser.BOTH =>
+        StringTrim(srcStr, trimStr)
+      case HoodieSqlBaseParser.LEADING =>
+        StringTrimLeft(srcStr, trimStr)
+      case HoodieSqlBaseParser.TRAILING =>
+        StringTrimRight(srcStr, trimStr)
+      case other =>
+        throw new ParseException("Function trim doesn't support with " +
+          s"type $other. Please use BOTH, LEADING or TRAILING as trim type", 
ctx)
+    }
+  }
+
+  /**
+   * Create a Overlay expression.
+   */
+  override def visitOverlay(ctx: OverlayContext): Expression = withOrigin(ctx) 
{
+    val input = expression(ctx.input)
+    val replace = expression(ctx.replace)
+    val position = expression(ctx.position)
+    val lengthOpt = Option(ctx.length).map(expression)
+    lengthOpt match {
+      case Some(length) => Overlay(input, replace, position, length)
+      case None => new Overlay(input, replace, position)
+    }
+  }
+
+  /**
+   * Create a (windowed) Function expression.
+   */
+  override def visitFunctionCall(ctx: FunctionCallContext): Expression = 
withOrigin(ctx) {
+    // Create the function call.
+    val name = ctx.functionName.getText
+    val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
+    // Call `toSeq`, otherwise `ctx.argument.asScala.map(expression)` is 
`Buffer` in Scala 2.13
+    val arguments = ctx.argument.asScala.map(expression).toSeq match {
+      case Seq(UnresolvedStar(None))
+        if name.toLowerCase(Locale.ROOT) == "count" && !isDistinct =>
+        // Transform COUNT(*) into COUNT(1).
+        Seq(Literal(1))
+      case expressions =>
+        expressions
+    }
+    val filter = Option(ctx.where).map(expression(_))
+    val ignoreNulls =
+      Option(ctx.nullsOption).map(_.getType == HoodieSqlBaseParser.IGNORE)

Review Comment:
   The signature of `UnresolvedFunction.ignoreNulls` changed between Spark 4.1 
and 4.2 upstream:
   
   - **Spark 4.1**: `ignoreNulls: Boolean` → 4.1 builder calls 
`.getOrElse(false)` to flatten to a `Boolean`.
   - **Spark 4.2**: `ignoreNulls: Option[Boolean]` → 4.2 builder keeps the raw 
`Option`, so the `.getOrElse(false)` is gone.
   
   Bytecode shows it directly:
   - 4.1 `UnresolvedFunction`: `private final boolean ignoreNulls`
   - 4.2 `UnresolvedFunction`: `private final scala.Option<java.lang.Object> 
ignoreNulls`
   
   These `HoodieSparkX_YExtendedSqlAstBuilder` files are copies of Spark's 
upstream `AstBuilder.visitFunctionCall` for each Spark minor, so they track 
that signature change verbatim. The semantic difference upstream is that 4.2 
distinguishes "user didn't specify nulls option" (`None`) from "user wrote 
RESPECT NULLS" (`Some(false)`) — useful for functions where the default 
behavior isn't `false` (e.g., `mode`/percentile-style aggregates where 
unspecified means "implementation default"). 4.1 collapsed both into `false`.
   
   Locations: 
`hudi-spark4.1.x/...HoodieSpark4_1ExtendedSqlAstBuilder.scala:1916-1917` vs 
`hudi-spark4.2.x/...HoodieSpark4_2ExtendedSqlAstBuilder.scala:1916-1917`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to