alexeykudinkin commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1097916787


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -28,97 +28,125 @@ import 
org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TB
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.util.JFunction.scalaFunction1Noop
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, 
attributeEquals}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, EqualTo, Expression, Literal, 
NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
-import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
+import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference,
 encodeAsBase64String, stripCasting, toStructType}
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
 
 import java.util.Base64
 
 /**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that 
we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
  *
- * The main algorithm:
+ * NOTE: That this implementation is restricted in a some aspects to 
accommodate for Hudi's crucial
+ *       constraint (of requiring every record to bear unique primary-key): 
merging condition ([[mergeCondition]])
+ *       is currently can only (and must) reference target table's primary-key 
columns (this is necessary to
+ *       leverage Hudi's upserting capabilities including Indexes)
  *
- * We pushed down all the matched and not matched (condition, assignment) 
expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs 
will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record, 
while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ * Following algorithm is applied:
  *
- * For Mor table, it is a litter complex than this. The matched record also 
goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should 
process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ * <ol>
+ *   <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears 
correspondingly:
+ *   a) (required) "primary-key" column as well as b) (optional) "pre-combine" 
column; this is
+ *   required since MIT statements does not restrict [[sourceTable]]s schema 
to be aligned w/ the
+ *   [[targetTable]]s one, while Hudi's upserting flow expects such columns to 
be present</li>
  *
+ *   <li>After reshaping we're writing [[sourceTable]] as a normal batch using 
Hudi's upserting
+ *   sequence, where special [[ExpressionPayload]] implementation of the 
[[HoodieRecordPayload]]
+ *   is used allowing us to execute updating, deleting and inserting clauses 
like following:</li>
+ *
+ *     <ol>
+ *       <li>All the matched {@code WHEN MATCHED AND ... THEN (DELETE|UPDATE 
...)} conditional clauses
+ *       will produce [[(condition, expression)]] tuples that will be executed 
w/in the
+ *       [[ExpressionPayload#combineAndGetUpdateValue]] against existing (from 
[[targetTable]]) and
+ *       incoming (from [[sourceTable]]) records producing the updated 
one;</li>
+ *
+ *       <li>Not matched {@code WHEN NOT MATCHED AND ... THEN INSERT ...} 
conditional clauses
+ *       will produce [[(condition, expression)]] tuples that will be executed 
w/in [[ExpressionPayload#getInsertValue]]
+ *       against incoming records producing ones to be inserted into target 
table;</li>
+ *     </ol>
+ * </ol>
+ *
+ * TODO explain workflow for MOR tables
  */
 case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends 
HoodieLeafRunnableCommand

Review Comment:
   Changes to this command were required due to 
   
    - Switching from bespoke resolution to Spark's standard one: we need to 
abide by Spark's semantic and had to get rid of some customizations implemented 
previously
    - Cleaned up the code considerably simplifying the implementation by 
getting rid of custom utilities and replacing them w/ Spark's standard ones 
(for ex, to resolve, bind expressions, etc)
    - Adding documentation to elaborate on the overall workflow



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -128,15 +150,151 @@ object HoodieAnalysis {
     //       To work this around, we injecting this as the rule that trails 
pre-CBO, ie it's
     //          - Triggered before CBO, therefore have access to the same 
stats as CBO
     //          - Precedes actual [[customEarlyScanPushDownRules]] invocation
-    optimizerRules += (spark => HoodiePruneFileSourcePartitions(spark))
+    rules += (spark => HoodiePruneFileSourcePartitions(spark))
+
+    rules
+  }
+
+  /**
+   * This rule adjusts output of the [[LogicalRelation]] resolving int Hudi 
tables such that all of the
+   * default Spark resolution could be applied resolving standard Spark SQL 
commands
+   *
+   * <ul>
+   *  <li>`MERGE INTO ...`</li>
+   *  <li>`INSERT INTO ...`</li>
+   *  <li>`UPDATE ...`</li>
+   * </ul>
+   *
+   * even though Hudi tables might be carrying meta-fields that have to be 
ignored during resolution phase.
+   *
+   * Spark >= 3.2 bears fully-fledged support for meta-fields and such antics 
are not required for it:
+   * we just need to annotate corresponding attributes as "metadata" for Spark 
to be able to ignore it.
+   *
+   * In Spark < 3.2 however, this is worked around by simply removing any 
meta-fields from the output
+   * of the [[LogicalRelation]] resolving into Hudi table. Note that, it's a 
safe operation since we
+   * actually need to ignore these values anyway
+   */
+  case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends 
Rule[LogicalPlan] {

Review Comment:
   This is the core of the change:
   
    - Here we remove implementation of the bespoke resolution rules for Hudi 
components, instead relying on Spark to resolve these (most of these constructs 
don't have any custom logic relative to vanilla Spark SQL and therefore would 
be perfectly fine resolved by standard Spark resolution rules) 
    - Instead we repurpose these rules to serve as conversion point from 
Spark's standard constructs (like `MergeIntoTable`) into Hudi's ones that 
implement Hudi-specific semantic (`MergeIntoHoodieTableCommand`). Note that, we 
require these constructs be fully resolved prior to us trying to convert them



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -276,337 +457,11 @@ case class HoodieAnalysis(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
   }
 }
 
-/**
- * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
- *
- * @param sparkSession
- */
-case class HoodieResolveReferences(sparkSession: SparkSession) extends 
Rule[LogicalPlan]

Review Comment:
   This is an example of the custom rule that is removed completely



##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, 
Window}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+
+/**
+ * NOTE: This code is borrowed from Spark 3.1.3
+ *       This code is borrowed, so that we can have some advanced Spark SQL 
functionality (like Merge Into, for ex)
+ *       in Spark 2.x
+ *
+ *       PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY 
NECESSARY
+ */
+object HoodieSpark2Analysis {
+
+  case class ResolveReferences(spark: SparkSession) extends Rule[LogicalPlan] {
+
+    private val resolver = spark.sessionState.conf.resolver
+
+    override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
+      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)

Review Comment:
   These rules were borrowed from Spark 3.1.3 to bring support for `Merge Into` 
statement to Spark 2.x, which doesn't have it out of the box



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to