This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e3dabbc3ad [HUDI-4298] Mor table reading for base and log files lost
sequence of events (#6286)
e3dabbc3ad is described below
commit e3dabbc3ad0a686c1bc526eb9b21586fe665812f
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Aug 3 16:49:51 2022 +0800
[HUDI-4298] Mor table reading for base and log files lost sequence of
events (#6286)
* [HUDI-4298] Mor table reading for base and log files lost sequence of
events
Signed-off-by: HunterXHunter <[email protected]>
---
.../table/format/mor/MergeOnReadInputFormat.java | 11 ++++++---
.../java/org/apache/hudi/util/StreamerUtil.java | 19 +++++++++++-----
.../apache/hudi/table/format/TestInputFormat.java | 26 ++++++++++++++++++++++
3 files changed, 47 insertions(+), 9 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 76e9e60ee0..3ca04986fe 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
@@ -63,6 +64,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.IntStream;
@@ -634,10 +636,12 @@ public class MergeOnReadInputFormat
private final Set<String> keyToSkip = new HashSet<>();
+ private final Properties payloadProps;
+
private RowData currentRecord;
MergeIterator(
- Configuration finkConf,
+ Configuration flinkConf,
org.apache.hadoop.conf.Configuration hadoopConf,
MergeOnReadInputSplit split,
RowType tableRowType,
@@ -650,7 +654,8 @@ public class MergeOnReadInputFormat
ParquetColumnarRowSplitReader reader) { // the reader should be with
full schema
this.tableSchema = tableSchema;
this.reader = reader;
- this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf,
hadoopConf);
+ this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf,
hadoopConf);
+ this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
this.logKeysIterator = scanner.getRecords().keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;
@@ -751,7 +756,7 @@ public class MergeOnReadInputFormat
String curKey) throws IOException {
final HoodieAvroRecord<?> record = (HoodieAvroRecord)
scanner.getRecords().get(curKey);
GenericRecord historyAvroRecord = (GenericRecord)
rowDataToAvroConverter.convert(tableSchema, curRow);
- return record.getData().combineAndGetUpdateValue(historyAvroRecord,
tableSchema);
+ return record.getData().combineAndGetUpdateValue(historyAvroRecord,
tableSchema, payloadProps);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4e819ecd7b..7c7cdcc8ad 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -228,12 +228,7 @@ public class StreamerUtil {
.withClientNumRetries(30)
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
.build())
- .withPayloadConfig(HoodiePayloadConfig.newBuilder()
-
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
-
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
-
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
-
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
- .build())
+ .withPayloadConfig(getPayloadConfig(conf))
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
.withEmbeddedTimelineServerReuseEnabled(true) // make write client
embedded timeline service singleton
.withAutoCommit(false)
@@ -251,6 +246,18 @@ public class StreamerUtil {
return writeConfig;
}
+ /**
+ * Returns the payload config with given configuration.
+ */
+ public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
+ return HoodiePayloadConfig.newBuilder()
+ .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+ .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+ .build();
+ }
+
/**
* Converts the give {@link Configuration} to {@link TypedProperties}.
* The default values are also set up.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index b663a4af3e..9f82161908 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -473,6 +474,31 @@ public class TestInputFormat {
TestData.assertRowDataEquals(actual6, Collections.emptyList());
}
+ @Test
+ void testMergeOnReadDisorderUpdateAfterCompaction() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(),
EventTimeAvroPayload.class.getName());
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write base file first with compaction.
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ TestData.writeData(TestData.DATA_SET_DISORDER_INSERT, conf);
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ final String baseResult = TestData.rowDataToString(readData(inputFormat));
+ String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
+ assertThat(baseResult, is(expected));
+
+ // write another commit using logs and read again.
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ TestData.writeData(TestData.DATA_SET_SINGLE_INSERT, conf);
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+ final String baseMergeLogFileResult =
TestData.rowDataToString(readData(inputFormat));
+ assertThat(baseMergeLogFileResult, is(expected));
+ }
+
@Test
void testReadArchivedCommitsIncrementally() throws Exception {
Map<String, String> options = new HashMap<>();