alexeykudinkin commented on code in PR #6361:
URL: https://github.com/apache/hudi/pull/6361#discussion_r1054943048
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -25,265 +25,266 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
TBL_NAME}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+import org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast,
attributeEquals}
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal,
NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
-import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
+import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String,
toStructType}
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
-import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
+import
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS,
_}
import org.apache.spark.sql.hudi.{ProvidesHoodieConfig, SerDeUtils}
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
import java.util.Base64
/**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that
we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
*
- * The main algorithm:
+ * NOTE: That this implementation is restricted in a some aspects to
accommodate for Hudi's crucial
+ * constraint (of requiring every record to bear unique primary-key):
merging condition ([[mergeCondition]])
+ * is currently can only (and must) reference target table's primary-key
columns (this is necessary to
+ * leverage Hudi's upserting capabilities including Indexes)
*
- * We pushed down all the matched and not matched (condition, assignment)
expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs
will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record,
while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ * Following algorithm is applied:
*
- * For Mor table, it is a litter complex than this. The matched record also
goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should
process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ * <ol>
+ * <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears
correspondingly:
+ * a) (required) "primary-key" column as well as b) (optional) "pre-combine"
column; this is
+ * required since MIT statements does not restrict [[sourceTable]]s schema
to be aligned w/ the
+ * [[targetTable]]s one, while Hudi's upserting flow expects such columns to
be present</li>
*
+ * <li>After reshaping we're writing [[sourceTable]] as a normal batch using
Hudi's upserting
+ * sequence, where special [[ExpressionPayload]] implementation of the
[[HoodieRecordPayload]]
+ * is used allowing us to execute updating, deleting and inserting clauses
like following:</li>
+ *
+ * <ol>
+ * <li>All the matched {@code WHEN MATCHED AND ... THEN (DELETE|UPDATE
...)} conditional clauses
+ * will produce [[(condition, expression)]] tuples that will be executed
w/in the
+ * [[ExpressionPayload#combineAndGetUpdateValue]] against existing (from
[[targetTable]]) and
+ * incoming (from [[sourceTable]]) records producing the updated
one;</li>
+ *
+ * <li>Not matched {@code WHEN NOT MATCHED AND ... THEN INSERT ...}
conditional clauses
+ * will produce [[(condition, expression)]] tuples that will be executed
w/in [[ExpressionPayload#getInsertValue]]
+ * against incoming records producing ones to be inserted into target
table;</li>
+ * </ol>
+ * </ol>
+ *
+ * TODO explain workflow for MOR tables
Review Comment:
Changes to this command were required due to
- Switching from bespoke resolution to Spark's standard one: we need to
abide by Spark's semantic and had to get rid of some customizations implemented
previously
- Cleaned up the code considerably simplifying the implementation by
getting rid of custom utilities and replacing them w/ Spark's standard ones
(for ex, to resolve, bind expressions, etc)
- Adding documentation to elaborate on the overall workflow
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -120,68 +136,75 @@ object HoodieAnalysis {
rules
}
+ private[sql] object MatchInsertIntoStatement {
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, Map[String,
Option[String]], LogicalPlan, Boolean, Boolean)] =
+ sparkAdapter.getCatalystPlanUtils.unapplyInsertIntoStatement(plan)
+ }
+
+ private[sql] object ResolvesToHudiTable {
+ def unapply(plan: LogicalPlan): Option[CatalogTable] =
+ sparkAdapter.resolveHoodieTable(plan)
+ }
+
+ private[sql] def failAnalysis(msg: String): Nothing = {
+ throw new AnalysisException(msg)
+ }
}
/**
* Rule for convert the logical plan to command.
*
* @param sparkSession
*/
-case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
- with SparkAdapterSupport {
+case class HoodieAnalysis(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
Review Comment:
This is the core of the change:
- Here we remove implementation of the bespoke resolution rules for Hudi
components, instead relying on Spark to resolve these (most of these constructs
don't have any custom logic relative to vanilla Spark SQL and therefore would
be perfectly fine resolved by standard Spark resolution rules)
- Instead we repurpose these rules to serve as conversion point from
Spark's standard constructs (like `MergeIntoTable`) into Hudi's ones that
implement Hudi-specific semantic (`MergeIntoHoodieTableCommand`). Note that, we
require these constructs be fully resolved prior to us trying to convert them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]