This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 27ada3a78a20e26ce6dae2a8b98e26d58461d701 Author: Danny Chan <[email protected]> AuthorDate: Thu Jan 5 19:28:34 2023 +0800 [HUDI-5502] Support insert overwrite for flink mor table with bucket index (#7610) (cherry picked from commit 2a486770cc1421dff9fbeddb1cedec7d0bf456ee) --- .../hudi/common/table/TestTimelineUtils.java | 2 +- .../sink/bucket/BucketStreamWriteFunction.java | 5 +++ .../apache/hudi/table/ITTestHoodieDataSource.java | 42 ++++++++++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 0cb1036eddb..149206f1fec 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -326,7 +326,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { public void verifyTimeline(List<HoodieInstant> expectedInstants, HoodieTimeline timeline) { assertEquals( expectedInstants.stream().sorted().collect(Collectors.toList()), - timeline.getInstants().stream().sorted().collect(Collectors.toList()) + timeline.getInstants().sorted().collect(Collectors.toList()) ); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 1ccfe91dbc0..7af12487587 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.sink.StreamWriteFunction; @@ -143,6 +144,10 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> { * This is a required operation for each restart to avoid having duplicate file ids for one bucket. */ private void bootstrapIndexIfNeed(String partition) { + if (OptionsResolver.isInsertOverwrite(config)) { + // skips the index loading for insert overwrite operation. + return; + } if (bucketIndex.containsKey(partition)) { return; } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 49fb82709a3..0e3e81fecc3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -624,27 +624,36 @@ public class ITTestHoodieDataSource extends AbstractTestBase { } @ParameterizedTest - @EnumSource(value = ExecMode.class) - void testInsertOverwrite(ExecMode execMode) { - TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; + @MethodSource("indexAndTableTypeParams") + void testInsertOverwrite(String indexType, HoodieTableType tableType) { + TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.INDEX_TYPE, indexType) .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_T1); // overwrite partition 'par1' and increase in age by 1 - final String insertInto2 = "insert overwrite t1 partition(`partition`='par1') values\n" + final String insertInto1 = "insert overwrite t1 partition(`partition`='par1') values\n" + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n" + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n"; - execInsertSql(tableEnv, insertInto2); + execInsertSql(tableEnv, insertInto1); List<Row> result1 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE); + // execute the same statement again and check the result + execInsertSql(tableEnv, insertInto1); + + List<Row> result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result2, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE); + // overwrite the whole table final String insertInto3 = "insert overwrite t1 values\n" + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n" @@ -652,12 +661,18 @@ public class ITTestHoodieDataSource extends AbstractTestBase { execInsertSql(tableEnv, insertInto3); - List<Row> result2 = CollectionUtil.iterableToList( + List<Row> result3 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); final String expected = "[" + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], " + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]"; - assertRowsEquals(result2, expected); + assertRowsEquals(result3, expected); + + // execute the same statement again and check the result + execInsertSql(tableEnv, insertInto3); + List<Row> result4 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result4, expected); } @ParameterizedTest @@ -1529,6 +1544,19 @@ public class ITTestHoodieDataSource extends AbstractTestBase { return Stream.of(data).map(Arguments::of); } + /** + * Return test params => (index type, table type). + */ + private static Stream<Arguments> indexAndTableTypeParams() { + Object[][] data = + new Object[][] { + {"FLINK_STATE", HoodieTableType.COPY_ON_WRITE}, + {"FLINK_STATE", HoodieTableType.MERGE_ON_READ}, + {"BUCKET", HoodieTableType.COPY_ON_WRITE}, + {"BUCKET", HoodieTableType.MERGE_ON_READ}}; + return Stream.of(data).map(Arguments::of); + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish
