alexeykudinkin commented on code in PR #6361:
URL: https://github.com/apache/hudi/pull/6361#discussion_r957962272


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -26,127 +26,163 @@ import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, EqualTo, Expression, Literal, 
NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
+import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.sameNamedExpr
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils}
 import org.apache.spark.sql.types.{BooleanType, StructType}
 
 import java.util.Base64
 
-
 /**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that 
we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
+ *
+ * NOTE: That this implementation is restricted in a some aspects to 
accommodate for Hudi's crucial
+ *       constraint (of requiring every record to bear unique primary-key): 
merging condition ([[mergeCondition]])
+ *       is currently can only (and must) reference target table's primary-key 
columns (this is necessary to
+ *       leverage Hudi's upserting capabilities including Indexes)
+ *
+ * Following algorithm is applied:
  *
- * The main algorithm:
+ * <ol>
+ *   <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears 
correspondingly:
+ *   a) (required) "primary-key" column as well as b) (optional) "pre-combine" 
column; this is
+ *   required since MIT statements does not restrict [[sourceTable]]s schema 
to be aligned w/ the
+ *   [[targetTable]]s one, while Hudi's upserting flow expects such columns to 
be present</li>
  *
- * We pushed down all the matched and not matched (condition, assignment) 
expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs 
will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record, 
while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ *   <li>After reshaping we're writing [[sourceTable]] as a normal batch using 
Hudi's upserting
+ *   sequence, where special [[ExpressionPayload]] implementation of the 
[[HoodieRecordPayload]]
+ *   is used allowing us to execute updating, deleting and inserting clauses 
like following:</li>
  *
- * For Mor table, it is a litter complex than this. The matched record also 
goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should 
process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ *     <ol>
+ *       <li>All the matched {@code WHEN MATCHED AND ... THEN (DELETE|UPDATE 
...)} conditional clauses
+ *       will produce [[(condition, expression)]] tuples that will be executed 
w/in the
+ *       [[ExpressionPayload#combineAndGetUpdateValue]] against existing (from 
[[targetTable]]) and
+ *       incoming (from [[sourceTable]]) records producing the updated 
one;</li>
  *
+ *       <li>Not matched {@code WHEN NOT MATCHED AND ... THEN INSERT ...} 
conditional clauses
+ *       will produce [[(condition, expression)]] tuples that will be executed 
w/in [[ExpressionPayload#getInsertValue]]
+ *       against incoming records producing ones to be inserted into target 
table;</li>
+ *     </ol>
+ * </ol>
+ *
+ * TODO explain workflow for MOR tables
  */
 case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends 
HoodieLeafRunnableCommand

Review Comment:
   Deleting custom Spark rules uncovered quite a few issues in this 
implementation, unfortunately had to essentially re-write it to address these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to