This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a67bfc497 [HUDI-6358] Fix flink payload merger with deletes (#8935)
e9a67bfc497 is described below

commit e9a67bfc49759637ec680c27179e25c01cae10b7
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jun 12 22:42:17 2023 +0800

    [HUDI-6358] Fix flink payload merger with deletes (#8935)
---
 .../apache/hudi/table/format/mor/MergeOnReadInputFormat.java  | 11 +++++------
 .../java/org/apache/hudi/table/format/TestInputFormat.java    |  8 ++++++++
 .../src/test/java/org/apache/hudi/utils/TestData.java         |  4 ++++
 3 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 767968629d1..86240fa5982 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -73,7 +73,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.IntStream;
@@ -767,7 +766,7 @@ public class MergeOnReadInputFormat
         final String curKey = 
currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
         if (scanner.getRecords().containsKey(curKey)) {
           keyToSkip.add(curKey);
-          Option<HoodieAvroIndexedRecord> mergedAvroRecord = 
mergeRowWithLog(currentRecord, curKey);
+          Option<HoodieRecord<IndexedRecord>> mergedAvroRecord = 
mergeRowWithLog(currentRecord, curKey);
           if (!mergedAvroRecord.isPresent()) {
             // deleted
             continue;
@@ -838,13 +837,13 @@ public class MergeOnReadInputFormat
       }
     }
 
-    private Option<HoodieAvroIndexedRecord> mergeRowWithLog(RowData curRow, 
String curKey) {
-      final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
+    @SuppressWarnings("unchecked")
+    private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(RowData 
curRow, String curKey) {
+      final HoodieRecord<?> record = scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) 
rowDataToAvroConverter.convert(tableSchema, curRow);
       HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new 
HoodieAvroIndexedRecord(historyAvroRecord);
       try {
-        Option<HoodieRecord> resultRecord = 
recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, 
payloadProps).map(Pair::getLeft);
-        return resultRecord.get().toIndexedRecord(tableSchema, new 
Properties());
+        return recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, 
record, tableSchema, payloadProps).map(Pair::getLeft);
       } catch (IOException e) {
         throw new HoodieIOException("Merge base and delta payloads exception", 
e);
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index d1b5516d1ad..7c076344166 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -952,6 +952,14 @@ public class TestInputFormat {
     assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
     final String baseMergeLogFileResult = 
TestData.rowDataToString(readData(inputFormat));
     assertThat(baseMergeLogFileResult, is(expected));
+
+    // write another commit with delete messages
+    TestData.writeData(TestData.DATA_SET_SINGLE_DELETE, conf);
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+    final String baseMergeLogFileResult2 = 
TestData.rowDataToString(readData(inputFormat));
+    assertThat(baseMergeLogFileResult2, is("[]"));
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7199ba069fc..7f9ad108941 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -330,6 +330,10 @@ public class TestData {
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
 
+  public static List<RowData> DATA_SET_SINGLE_DELETE = 
Collections.singletonList(
+      deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par1")));
+
   public static List<RowData> DATA_SET_DISORDER_INSERT = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(3), StringData.fromString("par1")),

Reply via email to