This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a8c5df1544d [HUDI-7070] Disable partial update for MERGE INTO
statement without update action (#10049)
a8c5df1544d is described below
commit a8c5df1544d0255076a4daac8f1af89b7949925b
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Nov 10 05:03:33 2023 -0800
[HUDI-7070] Disable partial update for MERGE INTO statement without update
action (#10049)
This PR adds the check on update action to disable partial update for
MERGE INTO statement without update action. In case the inserts are
added to the log block, since inserts have to be full records, the partial
update functionality should be disabled and IS_PARTIAL log block
header should not be added.
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 3 +-
.../sql/hudi/TestPartialUpdateForMergeInto.scala | 147 ++++++++++++++++++++-
2 files changed, 142 insertions(+), 8 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index fbc9ab5b4e1..18403872f4a 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -416,7 +416,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
&& operation == UPSERT_OPERATION_OPT_VAL
&& parameters.getOrElse(
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
- ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean) {
+ ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
+ && updatingActions.nonEmpty) {
val updatedFieldSet = getUpdatedFields(updatingActions.map(a =>
a.assignments))
// Only enable partial updates if not all fields are updated
if (!areAllFieldsUpdated(updatedFieldSet)) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index f8bfda7501a..25aa955b811 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -33,27 +33,41 @@ import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
import org.apache.hudi.common.table.view.{FileSystemViewManager,
FileSystemViewStorageConfig, SyncableFileSystemView}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf,
getLogFileListFromFileSlice}
+import org.apache.hudi.config.HoodieIndexConfig.INDEX_TYPE
import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.metadata.HoodieTableMetadata
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import java.util.{Collections, List}
+import java.util.function.Predicate
+import java.util.{Collections, List, Optional}
import scala.collection.JavaConverters._
class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
- test("Test Partial Update with COW and Avro log format") {
+ test("Test partial update with COW and Avro log format") {
testPartialUpdate("cow", "avro")
}
- test("Test Partial Update with MOR and Avro log format") {
+ test("Test partial update with MOR and Avro log format") {
testPartialUpdate("mor", "avro")
}
- test("Test Partial Update with MOR and Parquet log format") {
+ test("Test partial update with MOR and Parquet log format") {
testPartialUpdate("mor", "parquet")
}
+ test("Test partial update and insert with COW and Avro log format") {
+ testPartialUpdateWithInserts("cow", "avro")
+ }
+
+ test("Test partial update and insert with MOR and Avro log format") {
+ testPartialUpdateWithInserts("mor", "avro")
+ }
+
+ test("Test partial update and insert with MOR and Parquet log format") {
+ testPartialUpdateWithInserts("mor", "parquet")
+ }
+
test("Test fallback to full update with MOR even if partial updates are
enabled") {
withTempDir { tmp =>
val tableName = generateTableName
@@ -103,6 +117,63 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
}
}
+ test("Test MERGE INTO with inserts only on MOR table when partial updates
are enabled") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath + "/" + tableName
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+ spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+ spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+ // Write inserts to log block
+ spark.sql(s"set ${INDEX_TYPE.key} = INMEMORY")
+
+ // Create a table with five data fields
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | _ts long,
+ | description string
+ |) using hudi
+ |tblproperties(
+ | type ='mor',
+ | primaryKey = 'id',
+ | preCombineField = '_ts'
+ |)
+ |location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1:
desc1')," +
+ "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
+
+ // Inserts only
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts,
'a1: updated' as description
+ |union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts, 'a3:
updated' as description
+ |union select 4 as id, 'a4' as name, 60 as price, 1270 as _ts, 'a4:
desc4' as description) s0
+ |on t0.id = s0.id
+ |when not matched then insert *
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "a1: desc1"),
+ Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+ Seq(3, "a3", 30.0, 1250, "a3: desc3"),
+ Seq(4, "a4", 60.0, 1270, "a4: desc4")
+ )
+
+ validateLogBlock(
+ basePath,
+ 2,
+ Seq(Seq("id", "name", "price", "_ts", "description"), Seq("id",
"name", "price", "_ts", "description")),
+ false)
+ }
+ }
+
def testPartialUpdate(tableType: String,
logDataBlockFormat: String): Unit = {
withTempDir { tmp =>
@@ -207,6 +278,62 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
}
}
+ def testPartialUpdateWithInserts(tableType: String,
+ logDataBlockFormat: String): Unit = {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath + "/" + tableName
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+ spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
+ spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+ spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+
+ // Create a table with five data fields
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | _ts long,
+ | description string
+ |) using hudi
+ |tblproperties(
+ | type ='$tableType',
+ | primaryKey = 'id',
+ | preCombineField = '_ts'
+ |)
+ |location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1:
desc1')," +
+ "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
+
+ // Partial updates with changed fields: "price" and "_ts" and inserts
using MERGE INTO statement
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts, ''
as description
+ |union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts, ''
as description
+ |union select 4 as id, 'a4' as name, 70 as price, 1270 as _ts, 'a4:
desc4' as description) s0
+ |on t0.id = s0.id
+ |when matched then update set price = s0.price, _ts = s0._ts
+ |when not matched then insert *
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+ Seq(1, "a1", 12.0, 1001, "a1: desc1"),
+ Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+ Seq(3, "a3", 25.0, 1260, "a3: desc3"),
+ Seq(4, "a4", 70.0, 1270, "a4: desc4")
+ )
+
+ if (tableType.equals("mor")) {
+ validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")), true)
+ }
+ }
+ }
+
test("Test MergeInto Exception") {
val tableName = generateTableName
spark.sql(
@@ -273,10 +400,16 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
}
)
val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
- val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
- val logFilePathList: List[String] = getLogFileListFromFileSlice(fileSlice)
+ val fileSlice: Optional[FileSlice] = fsView.getAllFileSlices("")
+ .filter(new Predicate[FileSlice] {
+ override def test(fileSlice: FileSlice): Boolean = {
+ getLogFileListFromFileSlice(fileSlice).size() == expectedNumLogFile
+ }
+ })
+ .findFirst()
+ assertTrue(fileSlice.isPresent)
+ val logFilePathList: List[String] =
getLogFileListFromFileSlice(fileSlice.get)
Collections.sort(logFilePathList)
- assertEquals(expectedNumLogFile, logFilePathList.size)
val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
for (i <- 0 until expectedNumLogFile) {