This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 78a56ba082538fbdea384e69ee690c6f909adb33
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Aug 3 16:49:51 2022 +0800

    [HUDI-4298] Mor table reading for base and log files lost sequence of 
events (#6286)
    
    * [HUDI-4298] Mor table reading for base and log files lost sequence of 
events
    
    Signed-off-by: HunterXHunter <[email protected]>
---
 .../table/format/mor/MergeOnReadInputFormat.java   | 11 ++++++---
 .../java/org/apache/hudi/util/StreamerUtil.java    | 19 +++++++++++-----
 .../apache/hudi/table/format/TestInputFormat.java  | 26 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 9 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 76e9e60ee0..3ca04986fe 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.RowDataProjection;
 import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.util.StringToRowDataConverter;
 
 import org.apache.avro.Schema;
@@ -63,6 +64,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.IntStream;
 
@@ -634,10 +636,12 @@ public class MergeOnReadInputFormat
 
     private final Set<String> keyToSkip = new HashSet<>();
 
+    private final Properties payloadProps;
+
     private RowData currentRecord;
 
     MergeIterator(
-        Configuration finkConf,
+        Configuration flinkConf,
         org.apache.hadoop.conf.Configuration hadoopConf,
         MergeOnReadInputSplit split,
         RowType tableRowType,
@@ -650,7 +654,8 @@ public class MergeOnReadInputFormat
         ParquetColumnarRowSplitReader reader) { // the reader should be with 
full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
-      this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, 
hadoopConf);
+      this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, 
hadoopConf);
+      this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
       this.requiredSchema = requiredSchema;
       this.requiredPos = requiredPos;
@@ -751,7 +756,7 @@ public class MergeOnReadInputFormat
         String curKey) throws IOException {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) 
rowDataToAvroConverter.convert(tableSchema, curRow);
-      return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema);
+      return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema, payloadProps);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4e819ecd7b..7c7cdcc8ad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -228,12 +228,7 @@ public class StreamerUtil {
                 .withClientNumRetries(30)
                 .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
                 .build())
-            .withPayloadConfig(HoodiePayloadConfig.newBuilder()
-                
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
-                
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
-                
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
-                
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
-                .build())
+            .withPayloadConfig(getPayloadConfig(conf))
             .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client 
embedded timeline service singleton
             .withAutoCommit(false)
@@ -251,6 +246,18 @@ public class StreamerUtil {
     return writeConfig;
   }
 
+  /**
+   * Returns the payload config with given configuration.
+   */
+  public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
+    return HoodiePayloadConfig.newBuilder()
+        .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+        
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+        
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+        .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+        .build();
+  }
+
   /**
    * Converts the give {@link Configuration} to {@link TypedProperties}.
    * The default values are also set up.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index b663a4af3e..9f82161908 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -473,6 +474,31 @@ public class TestInputFormat {
     TestData.assertRowDataEquals(actual6, Collections.emptyList());
   }
 
+  @Test
+  void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), 
EventTimeAvroPayload.class.getName());
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write base file first with compaction.
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, conf);
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    final String baseResult = TestData.rowDataToString(readData(inputFormat));
+    String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
+    assertThat(baseResult, is(expected));
+
+    // write another commit using logs and read again.
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+    final String baseMergeLogFileResult = 
TestData.rowDataToString(readData(inputFormat));
+    assertThat(baseMergeLogFileResult, is(expected));
+  }
+
   @Test
   void testReadArchivedCommitsIncrementally() throws Exception {
     Map<String, String> options = new HashMap<>();

Reply via email to