This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3be964096ac [HUDI-7046] Fix partial merging logic based on projected 
reader schema (#10011)
3be964096ac is described below

commit 3be964096ac625bafa4bc0b7625f3d05a0dd2994
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Nov 8 06:20:00 2023 -0800

    [HUDI-7046] Fix partial merging logic based on projected reader schema 
(#10011)
---
 .../apache/hudi/merge/SparkRecordMergingUtils.java |   3 +-
 .../hudi/common/engine/HoodieReaderContext.java    |  14 ++
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  52 +++---
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |   9 +-
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |   9 +-
 .../sql/hudi/TestPartialUpdateForMergeInto.scala   | 184 ++++++++++++---------
 6 files changed, 165 insertions(+), 106 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
index f02ebfc8293..e72c6dab8a5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -115,6 +115,7 @@ public class SparkRecordMergingUtils {
       InternalRow newPartialRow = newer.getData();
 
       Map<Integer, StructField> mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+      Map<String, Integer> oldNameToIdMapping = 
getCachedFieldNameToIdMapping(oldSchema);
       Map<String, Integer> newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
       List<Object> values = new ArrayList<>(mergedIdToFieldMapping.size());
       for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
@@ -125,7 +126,7 @@ public class SparkRecordMergingUtils {
           values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
         } else {
           // The field does not exist in the newer record; picks the value 
from older record
-          values.add(oldRow.get(fieldId, structField.dataType()));
+          values.add(oldRow.get(oldNameToIdMapping.get(structField.name()), 
structField.dataType()));
         }
       }
       InternalRow mergedRow = new GenericInternalRow(values.toArray());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index d561b059183..b24a4109e75 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -174,4 +174,18 @@ public abstract class HoodieReaderContext<T> {
     meta.put(INTERNAL_META_SCHEMA, schema);
     return meta;
   }
+
+  /**
+   * Updates the schema and reset the ordering value in existing metadata 
mapping of a record.
+   *
+   * @param meta   Metadata in a mapping.
+   * @param schema New schema to set.
+   * @return The input metadata mapping.
+   */
+  public Map<String, Object> 
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
+                                                                       Schema 
schema) {
+    meta.remove(INTERNAL_META_ORDERING_FIELD);
+    meta.put(INTERNAL_META_SCHEMA, schema);
+    return meta;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 90ebf71dfb1..b56885fb0d3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -118,35 +118,39 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
    * @return
    * @throws IOException
    */
-  protected Option<T> doProcessNextDataRecord(T record,
-                                              Map<String, Object> metadata,
-                                              Pair<Option<T>, Map<String, 
Object>> existingRecordMetadataPair) throws IOException {
+  protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T 
record,
+                                                                         
Map<String, Object> metadata,
+                                                                         
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws 
IOException {
     if (existingRecordMetadataPair != null) {
       // Merge and store the combined record
       // Note that the incoming `record` is from an older commit, so it should 
be put as
       // the `older` in the merge API
-      HoodieRecord<T> combinedRecord;
-      if (enablePartialMerging) {
-        combinedRecord = (HoodieRecord<T>) recordMerger.partialMerge(
-            readerContext.constructHoodieRecord(Option.of(record), metadata),
-            (Schema) metadata.get(INTERNAL_META_SCHEMA),
-            readerContext.constructHoodieRecord(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-            (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
-            readerSchema,
-            payloadProps).get().getLeft();
-      } else {
-        combinedRecord = (HoodieRecord<T>) recordMerger.merge(
-            readerContext.constructHoodieRecord(Option.of(record), metadata),
-            (Schema) metadata.get(INTERNAL_META_SCHEMA),
-            readerContext.constructHoodieRecord(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-            (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
-            payloadProps).get().getLeft();
-      }
+      Pair<HoodieRecord, Schema> combinedRecordAndSchema = enablePartialMerging
+          ? recordMerger.partialMerge(
+          readerContext.constructHoodieRecord(Option.of(record), metadata),
+          (Schema) metadata.get(INTERNAL_META_SCHEMA),
+          readerContext.constructHoodieRecord(
+              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+          (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+          readerSchema,
+          payloadProps).get()
+          : recordMerger.merge(
+          readerContext.constructHoodieRecord(Option.of(record), metadata),
+          (Schema) metadata.get(INTERNAL_META_SCHEMA),
+          readerContext.constructHoodieRecord(
+              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+          (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+          payloadProps).get();
+
+      HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
+
       // If pre-combine returns existing record, no need to update it
       if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
-        return Option.of(combinedRecord.getData());
+        return Option.of(Pair.of(
+            combinedRecord.getData(),
+            enablePartialMerging
+                ? 
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())
+                : metadata));
       }
       return Option.empty();
     } else {
@@ -154,7 +158,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be put into records(Map).
-      return Option.of(record);
+      return Option.of(Pair.of(record, metadata));
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 6e5679adfbf..620c3d84940 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -86,9 +86,12 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   @Override
   public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordKey) throws IOException {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
-    Option<T> mergedRecord = doProcessNextDataRecord(record, metadata, 
existingRecordMetadataPair);
-    if (mergedRecord.isPresent()) {
-      records.put(recordKey, 
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+    Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
+        doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
+    if (mergedRecordAndMetadata.isPresent()) {
+      records.put(recordKey, Pair.of(
+          
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
+          mergedRecordAndMetadata.get().getRight()));
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 352e7c726d7..be8cf072919 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -114,9 +114,12 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
   @Override
   public void processNextDataRecord(T record, Map<String, Object> metadata, 
Object recordPosition) throws IOException {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
-    Option<T> mergedRecord = doProcessNextDataRecord(record, metadata, 
existingRecordMetadataPair);
-    if (mergedRecord.isPresent()) {
-      records.put(recordPosition, 
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+    Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
+        doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
+    if (mergedRecordAndMetadata.isPresent()) {
+      records.put(recordPosition, Pair.of(
+          
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
+          mergedRecordAndMetadata.get().getRight()));
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index f1375f0749d..0d2b1e243eb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -37,92 +37,124 @@ import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATE
 import org.apache.hudi.metadata.HoodieTableMetadata
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 
-import java.io.File
 import java.util.{Collections, List}
 import scala.collection.JavaConverters._
 
 class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
 
-  test("Test Partial Update") {
-    withTempDir { tmp =>
-      testPartialUpdate(tmp, "cow", "avro")
-      testPartialUpdate(tmp, "mor", "avro")
-      testPartialUpdate(tmp, "mor", "parquet")
-    }
+  test("Test Partial Update with COW and Avro log format") {
+    testPartialUpdate("cow", "avro")
   }
 
-  def testPartialUpdate(tmp: File,
-                        tableType: String,
-                        logDataBlockFormat: String): Unit = {
-    val tableName = generateTableName
-    val basePath = tmp.getCanonicalPath + "/" + tableName
-    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
-    spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
-    spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
-    spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
-    spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
-    spark.sql(
-      s"""
-         |create table $tableName (
-         | id int,
-         | name string,
-         | price double,
-         | _ts long
-         |) using hudi
-         |tblproperties(
-         | type ='$tableType',
-         | primaryKey = 'id',
-         | preCombineField = '_ts'
-         |)
-         |location '$basePath'
-        """.stripMargin)
-    spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-
-    spark.sql(
-      s"""
-         |merge into $tableName t0
-         |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts ) s0
-         |on t0.id = s0.id
-         |when matched then update set price = s0.price, _ts = s0.ts
-         |""".stripMargin)
-
-    checkAnswer(s"select id, name, price, _ts from $tableName")(
-      Seq(1, "a1", 12.0, 1001)
-    )
+  test("Test Partial Update with MOR and Avro log format") {
+    testPartialUpdate("mor", "avro")
+  }
 
-    if (tableType.equals("mor")) {
-      validateLogBlock(basePath, 1, Seq("price", "_ts"))
-    }
+  test("Test Partial Update with MOR and Parquet log format") {
+    testPartialUpdate("mor", "parquet")
+  }
 
-    if (tableType.equals("cow")) {
-      // No preCombine field
-      val tableName2 = generateTableName
+  def testPartialUpdate(tableType: String,
+                        logDataBlockFormat: String): Unit = {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = tmp.getCanonicalPath + "/" + tableName
+      spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+      spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+      spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
+      spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+      spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+
+      // Create a table with five data fields
       spark.sql(
         s"""
-           |create table $tableName2 (
+           |create table $tableName (
            | id int,
            | name string,
-           | price double
+           | price double,
+           | _ts long,
+           | description string
            |) using hudi
            |tblproperties(
            | type ='$tableType',
-           | primaryKey = 'id'
+           | primaryKey = 'id',
+           | preCombineField = '_ts'
            |)
-           |location '${tmp.getCanonicalPath}/$tableName2'
+           |location '$basePath'
         """.stripMargin)
-      spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
+      spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: 
desc1')," +
+        "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')")
 
+      // Partial updates using MERGE INTO statement with changed fields: 
"price" and "_ts"
       spark.sql(
         s"""
-           |merge into $tableName2 t0
-           |using ( select 1 as id, 'a1' as name, 12 as price) s0
+           |merge into $tableName t0
+           |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts
+           |union select 3 as id, 'a3' as name, 25 as price, 1260 as ts) s0
            |on t0.id = s0.id
-           |when matched then update set price = s0.price
+           |when matched then update set price = s0.price, _ts = s0.ts
            |""".stripMargin)
 
-      checkAnswer(s"select id, name, price from $tableName2")(
-        Seq(1, "a1", 12.0)
+      checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+        Seq(1, "a1", 12.0, 1001, "a1: desc1"),
+        Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+        Seq(3, "a3", 25.0, 1260, "a3: desc3")
       )
+
+      if (tableType.equals("mor")) {
+        validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")))
+      }
+
+      // Partial updates using MERGE INTO statement with changed fields: 
"description" and "_ts"
+      spark.sql(
+        s"""
+           |merge into $tableName t0
+           |using ( select 1 as id, 'a1' as name, 'a1: updated desc1' as 
description, 1023 as ts
+           |union select 2 as id, 'a2' as name, 'a2: updated desc2' as 
description, 1270 as ts) s0
+           |on t0.id = s0.id
+           |when matched then update set description = s0.description, _ts = 
s0.ts
+           |""".stripMargin)
+
+      checkAnswer(s"select id, name, price, _ts, description from $tableName")(
+        Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
+        Seq(2, "a2", 20.0, 1270, "a2: updated desc2"),
+        Seq(3, "a3", 25.0, 1260, "a3: desc3")
+      )
+
+      if (tableType.equals("mor")) {
+        validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts", 
"description")))
+      }
+
+      if (tableType.equals("cow")) {
+        // No preCombine field
+        val tableName2 = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName2 (
+             | id int,
+             | name string,
+             | price double
+             |) using hudi
+             |tblproperties(
+             | type ='$tableType',
+             | primaryKey = 'id'
+             |)
+             |location '${tmp.getCanonicalPath}/$tableName2'
+        """.stripMargin)
+        spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
+
+        spark.sql(
+          s"""
+             |merge into $tableName2 t0
+             |using ( select 1 as id, 'a1' as name, 12 as price) s0
+             |on t0.id = s0.id
+             |when matched then update set price = s0.price
+             |""".stripMargin)
+
+        checkAnswer(s"select id, name, price from $tableName2")(
+          Seq(1, "a1", 12.0)
+        )
+      }
     }
   }
 
@@ -174,7 +206,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
 
   def validateLogBlock(basePath: String,
                        expectedNumLogFile: Int,
-                       changedFields: Seq[String]): Unit = {
+                       changedFields: Seq[Seq[String]]): Unit = {
     val hadoopConf = getDefaultHadoopConf
     val metaClient: HoodieTableMetaClient =
       
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
@@ -197,18 +229,20 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     assertEquals(expectedNumLogFile, logFilePathList.size)
 
     val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
-    val logReader = new HoodieLogFileReader(
-      metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
-      avroSchema, 1024 * 1024, true, false, false,
-      "id", null)
-    assertTrue(logReader.hasNext)
-    val logBlockHeader = logReader.next().getLogBlockHeader
-    assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
-    assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
-    val partialSchema = new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
-    val expectedPartialSchema = 
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
-      avroSchema, changedFields.asJava), false)
-    assertEquals(expectedPartialSchema, partialSchema)
-    assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+    for (i <- 0 until expectedNumLogFile) {
+      val logReader = new HoodieLogFileReader(
+        metaClient.getFs, new HoodieLogFile(logFilePathList.get(i)),
+        avroSchema, 1024 * 1024, true, false, false,
+        "id", null)
+      assertTrue(logReader.hasNext)
+      val logBlockHeader = logReader.next().getLogBlockHeader
+      assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
+      assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+      val partialSchema = new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+      val expectedPartialSchema = 
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+        avroSchema, changedFields(i).asJava), false)
+      assertEquals(expectedPartialSchema, partialSchema)
+      assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+    }
   }
 }

Reply via email to