This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 30348d0423d [HUDI-7067] Add fallback to full update if all fields are
updated in MERGE INTO statement (#10046)
30348d0423d is described below
commit 30348d0423d493c730662ec74e359075b6aae526
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Nov 9 22:04:44 2023 -0800
[HUDI-7067] Add fallback to full update if all fields are updated in MERGE
INTO statement (#10046)
This commit improves the partial update logic in Spark SQL MERGE INTO
statement so that the partial updates are not enabled if all fields
corresponding to the table schema are updated (e.g., UPDATE SET *), since if
all fields are updated, there is no point marking the updates as partial and
existing merging logic based on full updates can still be applied.
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 52 ++++++++++++----
.../sql/hudi/TestPartialUpdateForMergeInto.scala | 72 +++++++++++++++++++---
2 files changed, 102 insertions(+), 22 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index dd3c9df3bc8..fbc9ab5b4e1 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -410,19 +410,27 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
(DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
// Only enable writing partial updates to data blocks for upserts to MOR
tables,
- // when ENABLE_MERGE_INTO_PARTIAL_UPDATES is set to true
- val writePartialUpdates = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
+ // when ENABLE_MERGE_INTO_PARTIAL_UPDATES is set to true,
+ // and not all fields are updated
+ val writePartialUpdates = if (targetTableType == MOR_TABLE_TYPE_OPT_VAL
&& operation == UPSERT_OPERATION_OPT_VAL
&& parameters.getOrElse(
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
- ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean)
-
- if (writePartialUpdates) {
- val updatedFieldSeq = getUpdatedFields(updatingActions.map(a =>
a.assignments))
- writeParams ++= Seq(
- WRITE_PARTIAL_UPDATE_SCHEMA.key ->
- HoodieAvroUtils.generateProjectionSchema(fullSchema,
updatedFieldSeq.asJava).toString
- )
+ ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean) {
+ val updatedFieldSet = getUpdatedFields(updatingActions.map(a =>
a.assignments))
+ // Only enable partial updates if not all fields are updated
+ if (!areAllFieldsUpdated(updatedFieldSet)) {
+ val orderedUpdatedFieldSeq = getOrderedUpdatedFields(updatedFieldSet)
+ writeParams ++= Seq(
+ WRITE_PARTIAL_UPDATE_SCHEMA.key ->
+ HoodieAvroUtils.generateProjectionSchema(fullSchema,
orderedUpdatedFieldSeq.asJava).toString
+ )
+ true
+ } else {
+ false
+ }
+ } else {
+ false
}
writeParams ++= Seq(
@@ -474,8 +482,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* @param conditionalAssignments Conditional assignments.
* @return Updated fields based on the conditional assignments in the MERGE
INTO statement.
*/
- private def getUpdatedFields(conditionalAssignments: Seq[Seq[Assignment]]):
Seq[String] = {
- val updatedFieldsSeq = {
+ private def getUpdatedFields(conditionalAssignments: Seq[Seq[Assignment]]):
Set[Attribute] = {
+ {
conditionalAssignments.flatMap {
case assignments =>
// Extract all fields that are updated through the assignments
@@ -490,12 +498,30 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
}
}
}.toSet
+ }
+ /**
+ * @param updatedFieldSet Updated fields based on the conditional
assignments in the MERGE INTO statement.
+ * @return {@code true} if the updated fields covers all fields in the table
schema;
+ * {@code false} otherwise.
+ */
+ private def areAllFieldsUpdated(updatedFieldSet: Set[Attribute]): Boolean = {
+ !mergeInto.targetTable.output
+ .filterNot(attr => isMetaField(attr.name)).exists { tableAttr =>
+ !updatedFieldSet.exists(attr => attributeEquals(attr, tableAttr))
+ }
+ }
+
+ /**
+ * @param updatedFieldSet Set of fields updated.
+ * @return Ordered updated fields based on the target table schema.
+ */
+ private def getOrderedUpdatedFields(updatedFieldSet: Set[Attribute]):
Seq[String] = {
// Reorder the assignments to follow the ordering of the target table
mergeInto.targetTable.output
.filterNot(attr => isMetaField(attr.name))
.filter { tableAttr =>
- updatedFieldsSeq.exists(attr => attributeEquals(attr, tableAttr))
+ updatedFieldSet.exists(attr => attributeEquals(attr, tableAttr))
}
.map(attr => attr.name)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 0d2b1e243eb..f8bfda7501a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -35,7 +35,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf,
getLogFileListFromFileSlice}
import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.metadata.HoodieTableMetadata
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import java.util.{Collections, List}
import scala.collection.JavaConverters._
@@ -54,6 +54,55 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
testPartialUpdate("mor", "parquet")
}
+ test("Test fallback to full update with MOR even if partial updates are
enabled") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath + "/" + tableName
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+ spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+ spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+
+ // Create a table with five data fields
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | _ts long,
+ | description string
+ |) using hudi
+ |tblproperties(
+ | type ='mor',
+ | primaryKey = 'id',
+ | preCombineField = '_ts'
+ |)
+ |location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1:
desc1')," +
+ "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
+
+ // Update all fields
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts,
'a1: updated' as description
+ |union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts, 'a3:
updated' as description) s0
+ |on t0.id = s0.id
+ |when matched then update set *
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+ Seq(1, "a1", 12.0, 1001, "a1: updated"),
+ Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+ Seq(3, "a3", 25.0, 1260, "a3: updated")
+ )
+
+ validateLogBlock(basePath, 1, Seq(Seq("id", "name", "price", "_ts",
"description")), false)
+ }
+ }
+
def testPartialUpdate(tableType: String,
logDataBlockFormat: String): Unit = {
withTempDir { tmp =>
@@ -102,7 +151,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
)
if (tableType.equals("mor")) {
- validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")))
+ validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")), true)
}
// Partial updates using MERGE INTO statement with changed fields:
"description" and "_ts"
@@ -122,7 +171,7 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
)
if (tableType.equals("mor")) {
- validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts",
"description")))
+ validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts",
"description")), true)
}
if (tableType.equals("cow")) {
@@ -206,7 +255,8 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
def validateLogBlock(basePath: String,
expectedNumLogFile: Int,
- changedFields: Seq[Seq[String]]): Unit = {
+ changedFields: Seq[Seq[String]],
+ isPartial: Boolean): Unit = {
val hadoopConf = getDefaultHadoopConf
val metaClient: HoodieTableMetaClient =
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
@@ -237,12 +287,16 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
assertTrue(logReader.hasNext)
val logBlockHeader = logReader.next().getLogBlockHeader
assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
- assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
- val partialSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
- val expectedPartialSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+ if (isPartial) {
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+ assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+ } else {
+ assertFalse(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+ }
+ val actualSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+ val expectedSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
avroSchema, changedFields(i).asJava), false)
- assertEquals(expectedPartialSchema, partialSchema)
- assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+ assertEquals(expectedSchema, actualSchema)
}
}
}