yihua commented on code in PR #12584:
URL: https://github.com/apache/hudi/pull/12584#discussion_r1907942035
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
assert(insert.assignments.length <= targetTableSchema.length,
s"The number of insert assignments[${insert.assignments.length}] must
be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))
-
+ // Precombine field and primary key field must be present in the
assignment clause of all insert actions.
+ // Check has no effect if we don't have such fields in target table or we
don't have insert actions
+ insertActions.foreach(action =>
+ hoodieCatalogTable.preCombineKey.foreach(
+ field => {
+ validateTargetTableAttrExistsInAssignments(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+ Seq(field),
+ "pre-combine field",
+ action.assignments)
+ validateTargetTableAttrExistsInAssignments(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+ hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+ "primaryKey field",
+ action.assignments)
+ }))
}
- private def checkUpdatingActions(updateActions: Seq[UpdateAction]): Unit = {
+ private def checkUpdatingActions(updateActions: Seq[UpdateAction], props:
Map[String, String]): Unit = {
if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
- logWarning(s"Updates without precombine can have nondeterministic
behavior")
+ logWarning(s"Updates without pre-combine can have nondeterministic
behavior")
}
updateActions.foreach(update =>
assert(update.assignments.length <= targetTableSchema.length,
- s"The number of update assignments[${update.assignments.length}] must
be less than or equalequal to the " +
+ s"The number of update assignments[${update.assignments.length}] must
be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))
- // For MOR table, the target table field cannot be the right-value in the
update action.
if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
+ // For MOR table, the target table field cannot be the right-value in
the update action.
updateActions.foreach(update => {
val targetAttrs = update.assignments.flatMap(a => a.value.collect {
case attr: AttributeReference if
mergeInto.targetTable.outputSet.contains(attr) => attr
})
assert(targetAttrs.isEmpty,
s"Target table's field(${targetAttrs.map(_.name).mkString(",")})
cannot be the right-value of the update clause for MOR table.")
})
+ // Only when the partial update is enabled that primary key assignment
is not mandatory in update actions for MOR tables.
+ if (!isPartialUpdateActionForMOR(props)) {
+ // For MOR table, update assignment clause must have primary key field
being set explicitly even if it does not
+ // change. The check has no effect if there is no updateActions or we
don't have primaryKey
+ updateActions.foreach(action =>
validateTargetTableAttrExistsInAssignments(
+ sparkSession.sessionState.conf.resolver,
+ mergeInto.targetTable,
+
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+ "primaryKey field",
Review Comment:
use record key field to be consistent
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1051,7 +1052,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
s"""
| merge into $tableName
| using (
- | select 1 as id, 'a1' as name, 10 as price, $dataValue as c,
'1' as flag
+ | select 1 as id, 'a1' as name, 10 as price, cast($dataValue as
$dataType) as c, '1' as flag
Review Comment:
Use `1L` if there is any type mismatch in the tests. The user is expected
to provide the data with the correct types.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala:
##########
@@ -1030,4 +1031,175 @@ class TestMergeIntoTable2 extends
HoodieSparkSqlTestBase {
checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
}
}
+
+ test("Test MergeInto Anti-Patterns of assignment clauses") {
+ Seq("cow", "mor").foreach { tableType =>
+ withRecordType()(withTempDir { tmp =>
+ log.info(s"Testing table type $tableType")
+ spark.sql(s"set
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = false")
Review Comment:
We should unset the config after the test or set the default of the config
so that it does not affect other tests in the same Spark sessions (since this
`SET` statement has global impact).
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala:
##########
@@ -33,7 +33,7 @@ class TestMergeIntoLogOnlyTable extends
HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
- | ts long
+ | ts int
Review Comment:
Similar here
##########
hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql:
##########
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
# CREATE TABLE
create table h1 (
- id bigint,
+ id int,
Review Comment:
Why do we need this change?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
assert(insert.assignments.length <= targetTableSchema.length,
s"The number of insert assignments[${insert.assignments.length}] must
be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))
-
+ // Precombine field and primary key field must be present in the
assignment clause of all insert actions.
+ // Check has no effect if we don't have such fields in target table or we
don't have insert actions
+ insertActions.foreach(action =>
+ hoodieCatalogTable.preCombineKey.foreach(
Review Comment:
let's only validate precombine if the merge mode is `EVENT_TIME_ORDERING`.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1232,7 +1233,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
| id int,
| name string,
| value int,
- | ts long
+ | ts int
Review Comment:
Let's not change the types of precombine field.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -40,7 +40,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSuppo
| id int,
| name string,
| price double,
- | ts long
+ | ts int
Review Comment:
Since we also have "(int) 0" as the natural ordering, making this integer
can mask issues around precombine field and merging logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]