This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 05fc359635 [HUDI-5399] Flink mor table streaming read throws NPE
(#7504)
05fc359635 is described below
commit 05fc3596357765026f68397c5e2fa7fb017a8044
Author: Danny Chan <[email protected]>
AuthorDate: Mon Dec 19 14:24:26 2022 +0800
[HUDI-5399] Flink mor table streaming read throws NPE (#7504)
---
.../table/format/mor/MergeOnReadInputFormat.java | 16 +++-----
.../apache/hudi/table/format/TestInputFormat.java | 44 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 10 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 094f9f77fc..4ced1bb09f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -188,10 +188,10 @@ public class MergeOnReadInputFormat
protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit
split) throws IOException {
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
- if (split.getInstantRange() != null) {
+ if (split.getInstantRange().isPresent()) {
// base file only with commit time filtering
return new BaseFileOnlyFilteringIterator(
- split.getInstantRange(),
+ split.getInstantRange().get(),
this.tableState.getRequiredRowType(),
getBaseFileIterator(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
} else {
@@ -549,11 +549,11 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
BaseFileOnlyFilteringIterator(
- Option<InstantRange> instantRange,
+ InstantRange instantRange,
RowType requiredRowType,
ClosableIterator<RowData> nested) {
this.nested = nested;
- this.instantRange = instantRange.orElse(null);
+ this.instantRange = instantRange;
int[] positions = IntStream.range(1, 1 +
requiredRowType.getFieldCount()).toArray();
projection = RowDataProjection.instance(requiredRowType, positions);
}
@@ -562,12 +562,8 @@ public class MergeOnReadInputFormat
public boolean hasNext() {
while (this.nested.hasNext()) {
currentRecord = this.nested.next();
- if (instantRange != null) {
- boolean isInRange =
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
- if (isInRange) {
- return true;
- }
- } else {
+ boolean isInRange =
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+ if (isInRange) {
return true;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f9b7cde97d..affb92e884 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -535,6 +535,50 @@ public class TestInputFormat {
assertThat(actual3, is(expected3));
}
+ @Test
+ void testReadBaseFilesWithStartCommit() throws Exception {
+ beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+ org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
+
+ // write base files
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ IncrementalInputSplits incrementalInputSplits =
IncrementalInputSplits.builder()
+ .rowType(TestConfigurations.ROW_TYPE)
+ .conf(conf)
+ .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+ .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2",
"par3", "par4")))
+ .build();
+
+ // default read the latest commit
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits1.isEmpty());
+ List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+ String actual1 = TestData.rowDataToString(result1);
+ String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
+ assertThat(actual1, is(expected1));
+
+ // write another commit and read again
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+ // read from the latest commit
+ String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1,
HoodieTimeline.COMMIT_ACTION);
+ conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
+
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits2.isEmpty());
+ List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ String actual2 = TestData.rowDataToString(result2);
+ String expected2 =
TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+ assertThat(actual2, is(expected2));
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {