codope commented on code in PR #7871:
URL: https://github.com/apache/hudi/pull/7871#discussion_r1103803439
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -75,7 +81,7 @@ trait HoodieCatalystExpressionUtils {
def unapplyCastExpression(expr: Expression): Option[(Expression, DataType,
Option[String], Boolean)]
}
-object HoodieCatalystExpressionUtils {
+object HoodieCatalystExpressionUtils extends SparkAdapterSupport {
Review Comment:
Why does it need to extend `SparkAdapterSupport`? Is there something that
changes across spark versions?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java:
##########
@@ -69,6 +69,26 @@ public static boolean nonEmpty(Collection<?> c) {
return !isNullOrEmpty(c);
}
+ /**
+ * Reduces provided {@link Collection} using provided {@code reducer}
applied to
+ * every element of the collection like following
+ *
+ * {@code reduce(reduce(reduce(identity, e1), e2), ...)}
+ *
+ * @param c target collection to be reduced
+ * @param identity element for reducing to start from
+ * @param reducer actual reducing operator
+ *
+ * @return result of the reduction of the collection using reducing operator
+ */
+ public static <T, U> U reduce(Collection<T> c, U identity, BiFunction<U, T,
U> reducer) {
+ return c.stream()
+ .sequential()
Review Comment:
Does it have to be strictly sequential? I mean the elements of collection
should be independent of each other. Is there any value add in parameterizing
this behavior, say we add a boolean `shouldReduceParallelly`?
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java:
##########
@@ -83,10 +83,16 @@ abstract class BaseColumnChange implements TableChange {
protected final InternalSchema internalSchema;
protected final Map<Integer, Integer> id2parent;
protected final Map<Integer, ArrayList<ColumnPositionChange>>
positionChangeMap = new HashMap<>();
+ protected final boolean caseSensitive;
BaseColumnChange(InternalSchema schema) {
+ this(schema, false);
Review Comment:
why default `caseSensitive` is false?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -28,97 +28,125 @@ import
org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TB
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.util.JFunction.scalaFunction1Noop
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkSqlWriter, SparkAdapterSupport}
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast,
attributeEquals}
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, EqualTo, Expression, Literal,
NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
-import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
-import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
+import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference,
encodeAsBase64String, stripCasting, toStructType}
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
import java.util.Base64
/**
- * The Command for hoodie MergeIntoTable.
- * The match on condition must contain the row key fields currently, so that
we can use Hoodie
- * Index to speed up the performance.
+ * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
*
- * The main algorithm:
+ * NOTE: That this implementation is restricted in a some aspects to
accommodate for Hudi's crucial
+ * constraint (of requiring every record to bear unique primary-key):
merging condition ([[mergeCondition]])
+ * is currently can only (and must) reference target table's primary-key
columns (this is necessary to
+ * leverage Hudi's upserting capabilities including Indexes)
*
- * We pushed down all the matched and not matched (condition, assignment)
expression pairs to the
- * ExpressionPayload. And the matched (condition, assignment) expression pairs
will execute in the
- * ExpressionPayload#combineAndGetUpdateValue to compute the result record,
while the not matched
- * expression pairs will execute in the ExpressionPayload#getInsertValue.
+ * Following algorithm is applied:
*
- * For Mor table, it is a litter complex than this. The matched record also
goes through the getInsertValue
- * and write append to the log. So the update actions & insert actions should
process by the same
- * way. We pushed all the update actions & insert actions together to the
- * ExpressionPayload#getInsertValue.
+ * <ol>
+ * <li>Incoming batch ([[sourceTable]]) is reshaped such that it bears
correspondingly:
+ * a) (required) "primary-key" column as well as b) (optional) "pre-combine"
column; this is
+ * required since MIT statements does not restrict [[sourceTable]]s schema
to be aligned w/ the
+ * [[targetTable]]s one, while Hudi's upserting flow expects such columns to
be present</li>
Review Comment:
Is this always true? I think if we enable validate avro schema, which then
checks for fields by names and hence MIT with different column names might fail.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -554,4 +657,16 @@ object MergeIntoHoodieTableCommand {
}
}
+ def stripCasting(expr: EqualTo): EqualTo = expr match {
+ case EqualTo(MatchCast(leftExpr, leftTargetType, _, _),
MatchCast(rightExpr, rightTargetType, _, _))
+ if leftTargetType.sameType(rightTargetType) => EqualTo(leftExpr,
rightExpr)
+ case _ => expr
+ }
+
+ def toStructType(attrs: Seq[Attribute]): StructType =
+ StructType(attrs.map(a => StructField(a.qualifiedName.replace('.', '_'),
a.dataType, a.nullable, a.metadata)))
Review Comment:
why do we need to replace perios by underscores?
##########
hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.{DataSourceReadOptions, DefaultSource,
SparkAdapterSupport}
+import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
+import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
+import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery.parseOptions
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.{Table, V1Table}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import
org.apache.spark.sql.hudi.analysis.HoodieSpark32PlusAnalysis.{HoodieV1OrV2Table,
ResolvesToHudiTable}
+import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
+import
org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand,
ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
+
+/**
+ * NOTE: PLEASE READ CAREFULLY
+ *
+ * Since Hudi relations don't currently implement DS V2 Read API, we have to
fallback to V1 here.
+ * Such fallback will have considerable performance impact, therefore it's
only performed in cases
+ * where V2 API have to be used. Currently only such use-case is using of
Schema Evolution feature
+ *
+ * Check out HUDI-4178 for more details
+ */
+case class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends
Rule[LogicalPlan]
Review Comment:
Can you help me understand why this is only needed for Spark 3.2.x?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -20,35 +20,44 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
-case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends
HoodieLeafRunnableCommand
- with SparkAdapterSupport with ProvidesHoodieConfig {
+case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends
HoodieLeafRunnableCommand
+ with SparkAdapterSupport
+ with ProvidesHoodieConfig {
- private val table = deleteTable.table
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
+ .map(HoodieCatalogTable(sparkSession, _))
+ .get
- private val tableId = getTableIdentifier(table)
+ val tableId = catalogTable.table.qualifiedName
- override def run(sparkSession: SparkSession): Seq[Row] = {
- logInfo(s"start execute delete command for $tableId")
+ logInfo(s"Executing 'DELETE FROM' command for $tableId")
+
+ val condition = sparkAdapter.extractDeleteCondition(dft)
+
+ val targetLogicalPlan = stripMetaFieldAttributes(dft.table)
Review Comment:
So what happens if delete query is predicated on one of the meta fields,
e.g. `DELETE FROM table where _hoodie_commit_time > 2023-01-01`?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -173,7 +172,21 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
(avroSchema, internalSchemaOpt)
}
- protected lazy val tableStructSchema: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+ protected lazy val tableStructSchema: StructType = {
+ val converted =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+ val resolver = sparkSession.sessionState.analyzer.resolver
+ val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
+
+ // TODO elaborate
Review Comment:
i think the code below speaks for itself, but please remove TODO and add a
comment if you think it needs description.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -50,6 +50,9 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
dir
}
+ // NOTE: We need to set "spark.testing" property to make sure Spark can
appropriately
+ // recognize environment as testing
+ System.setProperty("spark.testing", "true")
Review Comment:
Interesting! So is this property there for all spark versions and what are
the benefits of setting this?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -48,47 +47,19 @@ trait HoodieCatalystPlansUtils {
*/
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan
- /**
- * Convert a AliasIdentifier to TableIdentifier.
- */
- def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier
-
- /**
- * Convert a UnresolvedRelation to TableIdentifier.
- */
- def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier
-
/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType):
Join
/**
- * Test if the logical plan is a Insert Into LogicalPlan.
- */
- def isInsertInto(plan: LogicalPlan): Boolean
-
- /**
- * Get the member of the Insert Into LogicalPlan.
+ * Decomposes [[InsertIntoStatement]] into its arguments allowing to
accommodate for API
+ * changes in Spark 3.3
*/
- def getInsertIntoChildren(plan: LogicalPlan):
- Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean,
Boolean)]
+ def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan,
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
- /**
- * if the logical plan is a TimeTravelRelation LogicalPlan.
- */
- def isRelationTimeTravel(plan: LogicalPlan): Boolean
-
- /**
- * Get the member of the TimeTravelRelation LogicalPlan.
- */
- def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan,
Option[Expression], Option[String])]
-
- /**
- * Create a Insert Into LogicalPlan.
- */
- def createInsertInto(table: LogicalPlan, partition: Map[String,
Option[String]],
- query: LogicalPlan, overwrite: Boolean,
ifPartitionNotExists: Boolean): LogicalPlan
+ // TODO scala-docs
Review Comment:
add docs or remove the comment?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -539,6 +599,49 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
defaultOpts = Map.empty, overridingOpts = overridingOpts)
}
+
+
+ def validate(mit: MergeIntoTable): Unit = {
+ checkUpdatingActions(updatingActions)
+ checkInsertingActions(insertingActions)
+ checkDeletingActions(deletingActions)
+ }
+
+ private def checkDeletingActions(deletingActions: Seq[DeleteAction]): Unit =
{
+ if (deletingActions.length > 1) {
+ throw new AnalysisException(s"Only one deleting action is supported in
MERGE INTO statement (provided ${deletingActions.length})")
+ }
+ }
+
+ private def checkInsertingActions(insertActions: Seq[InsertAction]): Unit = {
+ insertActions.foreach(insert =>
+ assert(insert.assignments.length == targetTableSchema.length,
+ s"The number of insert assignments[${insert.assignments.length}] must
equal to the " +
+ s"targetTable field size[${targetTableSchema.length}]"))
+
+ }
+
+ private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+ if (updateActions.length > 1) {
+ throw new AnalysisException(s"Only one updating action is supported in
MERGE INTO statement (provided ${updateActions.length})")
+ }
+
+ //updateActions.foreach(update =>
+ // assert(update.assignments.length == targetTableSchema.length,
+ // s"The number of update assignments[${update.assignments.length}]
must equal to the " +
+ // s"targetTable field size[${targetTableSchema.length}]"))
Review Comment:
remove the commented part?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -127,164 +155,189 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// target table side (since we're gonna be matching against primary-key
column as is) expression
// on the opposite side of the comparison should be cast-able to the
primary-key column's data-type
// t/h "up-cast" (ie w/o any loss in precision)
- val target2Source = cleanedConditions.map {
- case EqualTo(CoercedAttributeReference(attr), expr)
- if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
- if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
- targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
- castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
- } else {
- throw new AnalysisException(s"Invalid MERGE INTO matching
condition: ${expr.sql}: "
- + s"can't cast ${expr.sql} (of ${expr.dataType}) to
${attr.dataType}")
- }
+ val targetAttr2ConditionExpressions = cleanedConditions.map {
+ case EqualTo(CoercedAttributeReference(attr), expr) if
targetAttrs.exists(f => attributeEquals(f, attr)) =>
+ if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+ // NOTE: It's critical we reference output attribute here and not
the one from condition
+ val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+ targetAttr -> castIfNeeded(expr, attr.dataType)
+ } else {
+ throw new AnalysisException(s"Invalid MERGE INTO matching condition:
${expr.sql}: "
+ + s"can't cast ${expr.sql} (of ${expr.dataType}) to
${attr.dataType}")
+ }
- case EqualTo(expr, CoercedAttributeReference(attr))
- if targetAttrs.exists(f => attributeEqual(f, attr, resolver)) =>
- if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
- targetAttrs.find(f => resolver(f.name, attr.name)).get.name ->
- castIfNeeded(expr, attr.dataType, sparkSession.sqlContext.conf)
- } else {
- throw new AnalysisException(s"Invalid MERGE INTO matching
condition: ${expr.sql}: "
- + s"can't cast ${expr.sql} (of ${expr.dataType}) to
${attr.dataType}")
- }
+ case EqualTo(expr, CoercedAttributeReference(attr)) if
targetAttrs.exists(f => attributeEquals(f, attr)) =>
+ if (exprUtils.canUpCast(expr.dataType, attr.dataType)) {
+ // NOTE: It's critical we reference output attribute here and not
the one from condition
+ val targetAttr = targetAttrs.find(f => attributeEquals(f, attr)).get
+ targetAttr -> castIfNeeded(expr, attr.dataType)
+ } else {
+ throw new AnalysisException(s"Invalid MERGE INTO matching condition:
${expr.sql}: "
+ + s"can't cast ${expr.sql} (of ${expr.dataType}) to
${attr.dataType}")
+ }
case expr =>
throw new AnalysisException(s"Invalid MERGE INTO matching condition:
`${expr.sql}`: "
+ "expected condition should be 'target.id = <source-column-expr>',
e.g. "
+ "`t.id = s.id` or `t.id = cast(s.id, ...)`")
- }.toMap
+ }
- target2Source
+ targetAttr2ConditionExpressions.collect {
+ case (attr, expr) if resolver(attr.name, primaryKeyField) =>
+ // NOTE: Here we validate that condition expression involving
primary-key column(s) is a simple
+ // attribute-reference expression (possibly wrapped into a
cast). This is necessary to disallow
+ // statements like following
+ //
+ // MERGE INTO ... AS t USING (
+ // SELECT ... FROM ... AS s
+ // )
+ // ON t.id = s.id + 1
+ // WHEN MATCHED THEN UPDATE *
+ //
+ // Which (in the current design) could result in a primary key
of the record being modified,
+ // which is not allowed.
+ if (!resolvesToSourceAttribute(expr)) {
+ throw new AnalysisException("Only simple conditions of the form
`t.id = s.id` are allowed on the " +
+ s"primary-key column. Found `${attr.sql} = ${expr.sql}`")
+ }
+
+ (attr, expr)
+ }
}
/**
- * Get the mapping of target preCombineField to the source expression.
+ * Please check description for [[primaryKeyAttributeToConditionExpression]]
*/
- private lazy val target2SourcePreCombineFiled: Option[(String, Expression)]
= {
- val updateActions = mergeInto.matchedActions.collect { case u:
UpdateAction => u }
- assert(updateActions.size <= 1, s"Only support one updateAction currently,
current update action count is: ${updateActions.size}")
-
- val updateAction = updateActions.headOption
- hoodieCatalogTable.preCombineKey.map(preCombineField => {
- val sourcePreCombineField =
- updateAction.map(u => u.assignments.filter {
- case Assignment(key: AttributeReference, _) =>
key.name.equalsIgnoreCase(preCombineField)
- case _=> false
- }.head.value
- ).getOrElse {
- // If there is no update action, mapping the target column to the
source by order.
- val target2Source = mergeInto.targetTable.output
- .filter(attr => !isMetaField(attr.name))
- .map(_.name)
- .zip(mergeInto.sourceTable.output.filter(attr =>
!isMetaField(attr.name)))
- .toMap
- target2Source.getOrElse(preCombineField, null)
+ private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute,
Expression)] = {
+ val resolver = sparkSession.sessionState.analyzer.resolver
+ hoodieCatalogTable.preCombineKey.map { preCombineField =>
+ val targetPreCombineAttribute =
+ mergeInto.targetTable.output
+ .find { attr => resolver(attr.name, preCombineField) }
+ .get
+
+ // To find corresponding "pre-combine" attribute w/in the
[[sourceTable]] we do
+ // - Check if we can resolve the attribute w/in the source table as
is; if unsuccessful, then
+ // - Check if in any of the update actions, right-hand side of the
assignment actually resolves
+ // to it, in which case we will determine left-hand side expression
as the value of "pre-combine"
+ // attribute w/in the [[sourceTable]]
+ val sourceExpr = {
+ mergeInto.sourceTable.output.find(attr => resolver(attr.name,
preCombineField)) match {
+ case Some(attr) => attr
+ case None =>
+ updatingActions.flatMap(_.assignments).collectFirst {
+ case Assignment(attr: AttributeReference, expr)
+ if resolver(attr.name, preCombineField) &&
resolvesToSourceAttribute(expr) => expr
+ } getOrElse {
+ throw new AnalysisException(s"Failed to resolve pre-combine
field `${preCombineField}` w/in the source-table output")
+ }
+
}
- (preCombineField, sourcePreCombineField)
- }).filter(p => p._2 != null)
+ }
+
+ (targetPreCombineAttribute, sourceExpr)
+ }
}
override def run(sparkSession: SparkSession): Seq[Row] = {
this.sparkSession = sparkSession
+ // TODO move to analysis phase
+ validate(mergeInto)
Review Comment:
This is a good point. Do you intend to move it in this PR itself?
##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/analysis/HoodieSpark2Analysis.scala:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate,
CurrentTimestamp, Expression, ExtractValue, GetStructField, LambdaFunction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction,
Window}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+
+/**
+ * NOTE: This code is borrowed from Spark 3.1.3
+ * This code is borrowed, so that we can have some advanced Spark SQL
functionality (like Merge Into, for ex)
+ * in Spark 2.x
+ *
+ * PLEASE REFRAIN MAKING ANY CHANGES TO THIS CODE UNLESS ABSOLUTELY
NECESSARY
+ */
+object HoodieSpark2Analysis {
Review Comment:
I thought MIT is supported in Spark 2.x. We have unit tests for MIT that run
for Spark 2.x as well, isn't it? If it's not supported, should we make it clear
in the quickstart guide as well? And should we add this support in a separate
PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]