This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 642f87cc6b [HUDI-4601] Read error from MOR table after compaction with
timestamp partitioning (#6365)
642f87cc6b is described below
commit 642f87cc6b6b2971911d2f27619ee6e6f02e76a4
Author: wuwenchi <[email protected]>
AuthorDate: Wed Aug 17 09:49:41 2022 +0800
[HUDI-4601] Read error from MOR table after compaction with timestamp
partitioning (#6365)
* read error from mor after compaction
Co-authored-by: 吴文池 <[email protected]>
---
.../table/format/cow/CopyOnWriteInputFormat.java | 11 +++++---
.../table/format/mor/MergeOnReadInputFormat.java | 11 +++++---
.../apache/hudi/table/ITTestHoodieDataSource.java | 29 +++++++++++++++++-----
3 files changed, 39 insertions(+), 12 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 719669b532..f04c23fe91 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -108,9 +108,14 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
LinkedHashMap<String, String> partSpec =
PartitionPathUtils.extractPartitionSpecFromPath(
fileSplit.getPath());
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
- partSpec.forEach((k, v) -> partObjects.put(k,
DataTypeUtils.resolvePartition(
- partDefaultName.equals(v) ? null : v,
- fullFieldTypes[fieldNameList.indexOf(k)])));
+ partSpec.forEach((k, v) -> {
+ DataType fieldType = fullFieldTypes[fieldNameList.indexOf(k)];
+ if (!DataTypeUtils.isDatetimeType(fieldType)) {
+ // date time type partition field is formatted specifically,
+ // read directly from the data file to avoid format mismatch or
precision loss
+ partObjects.put(k,
DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v,
fieldType));
+ }
+ });
this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
utcTimestamp,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 3ca04986fe..61cf52386b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -299,9 +299,14 @@ public class MergeOnReadInputFormat
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
- partSpec.forEach((k, v) -> partObjects.put(k,
DataTypeUtils.resolvePartition(
- defaultPartName.equals(v) ? null : v,
- fieldTypes.get(fieldNames.indexOf(k)))));
+ partSpec.forEach((k, v) -> {
+ DataType fieldType = fieldTypes.get(fieldNames.indexOf(k));
+ if (!DataTypeUtils.isDatetimeType(fieldType)) {
+ // date time type partition field is formatted specifically,
+ // read directly from the data file to avoid format mismatch or
precision loss
+ partObjects.put(k,
DataTypeUtils.resolvePartition(defaultPartName.equals(v) ? null : v,
fieldType));
+ }
+ });
return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index afe3e809b0..bf3abf74b8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1041,15 +1041,12 @@ public class ITTestHoodieDataSource extends
AbstractTestBase {
}
@ParameterizedTest
- @EnumSource(value = ExecMode.class)
- void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
- // can not read the hive style and timestamp based partitioning table
- // in batch mode, the code path in CopyOnWriteInputFormat relies on
- // the value on the partition path to recover the partition value,
- // but the date format has changed(milliseconds switch to hours).
+ @MethodSource("executionModeAndPartitioningParams")
+ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean
hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.partitionField("ts") // use timestamp as partition path field
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -1068,6 +1065,26 @@ public class ITTestHoodieDataSource extends
AbstractTestBase {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @Test
+ void testMergeOnReadCompactionWithTimestampPartitioning() {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
+ .option(FlinkOptions.COMPACTION_TASKS, 1)
+ .partitionField("ts")
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ List<Row> rows = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
@ParameterizedTest
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY,
FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
void testWriteAndReadWithDatePartitioning(String partitionFormat) {