This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 05fc359635 [HUDI-5399] Flink mor table streaming read throws NPE 
(#7504)
05fc359635 is described below

commit 05fc3596357765026f68397c5e2fa7fb017a8044
Author: Danny Chan <[email protected]>
AuthorDate: Mon Dec 19 14:24:26 2022 +0800

    [HUDI-5399] Flink mor table streaming read throws NPE (#7504)
---
 .../table/format/mor/MergeOnReadInputFormat.java   | 16 +++-----
 .../apache/hudi/table/format/TestInputFormat.java  | 44 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 10 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 094f9f77fc..4ced1bb09f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -188,10 +188,10 @@ public class MergeOnReadInputFormat
 
   protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit 
split) throws IOException {
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
-      if (split.getInstantRange() != null) {
+      if (split.getInstantRange().isPresent()) {
         // base file only with commit time filtering
         return new BaseFileOnlyFilteringIterator(
-            split.getInstantRange(),
+            split.getInstantRange().get(),
             this.tableState.getRequiredRowType(),
             getBaseFileIterator(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
       } else {
@@ -549,11 +549,11 @@ public class MergeOnReadInputFormat
     private RowData currentRecord;
 
     BaseFileOnlyFilteringIterator(
-        Option<InstantRange> instantRange,
+        InstantRange instantRange,
         RowType requiredRowType,
         ClosableIterator<RowData> nested) {
       this.nested = nested;
-      this.instantRange = instantRange.orElse(null);
+      this.instantRange = instantRange;
       int[] positions = IntStream.range(1, 1 + 
requiredRowType.getFieldCount()).toArray();
       projection = RowDataProjection.instance(requiredRowType, positions);
     }
@@ -562,12 +562,8 @@ public class MergeOnReadInputFormat
     public boolean hasNext() {
       while (this.nested.hasNext()) {
         currentRecord = this.nested.next();
-        if (instantRange != null) {
-          boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
-          if (isInRange) {
-            return true;
-          }
-        } else {
+        boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+        if (isInRange) {
           return true;
         }
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f9b7cde97d..affb92e884 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -535,6 +535,50 @@ public class TestInputFormat {
     assertThat(actual3, is(expected3));
   }
 
+  @Test
+  void testReadBaseFilesWithStartCommit() throws Exception {
+    beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+    org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
+
+    // write base files
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    IncrementalInputSplits incrementalInputSplits = 
IncrementalInputSplits.builder()
+        .rowType(TestConfigurations.ROW_TYPE)
+        .conf(conf)
+        .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+        .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", 
"par3", "par4")))
+        .build();
+
+    // default read the latest commit
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits1.isEmpty());
+    List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+    String actual1 = TestData.rowDataToString(result1);
+    String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
+    assertThat(actual1, is(expected1));
+
+    // write another commit and read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    // read from the latest commit
+    String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1, 
HoodieTimeline.COMMIT_ACTION);
+    conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
+
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits2.isEmpty());
+    List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    String actual2 = TestData.rowDataToString(result2);
+    String expected2 = 
TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+    assertThat(actual2, is(expected2));
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {

Reply via email to