This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 27ada3a78a20e26ce6dae2a8b98e26d58461d701
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jan 5 19:28:34 2023 +0800

    [HUDI-5502] Support insert overwrite for flink mor table with bucket index 
(#7610)
    
    (cherry picked from commit 2a486770cc1421dff9fbeddb1cedec7d0bf456ee)
---
 .../hudi/common/table/TestTimelineUtils.java       |  2 +-
 .../sink/bucket/BucketStreamWriteFunction.java     |  5 +++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 42 ++++++++++++++++++----
 3 files changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 0cb1036eddb..149206f1fec 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -326,7 +326,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   public void verifyTimeline(List<HoodieInstant> expectedInstants, 
HoodieTimeline timeline) {
     assertEquals(
         expectedInstants.stream().sorted().collect(Collectors.toList()),
-        timeline.getInstants().stream().sorted().collect(Collectors.toList())
+        timeline.getInstants().sorted().collect(Collectors.toList())
     );
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 1ccfe91dbc0..7af12487587 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.sink.StreamWriteFunction;
 
@@ -143,6 +144,10 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
    * This is a required operation for each restart to avoid having duplicate 
file ids for one bucket.
    */
   private void bootstrapIndexIfNeed(String partition) {
+    if (OptionsResolver.isInsertOverwrite(config)) {
+      // skips the index loading for insert overwrite operation.
+      return;
+    }
     if (bucketIndex.containsKey(partition)) {
       return;
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 49fb82709a3..0e3e81fecc3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -624,27 +624,36 @@ public class ITTestHoodieDataSource extends 
AbstractTestBase {
   }
 
   @ParameterizedTest
-  @EnumSource(value = ExecMode.class)
-  void testInsertOverwrite(ExecMode execMode) {
-    TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
+  @MethodSource("indexAndTableTypeParams")
+  void testInsertOverwrite(String indexType, HoodieTableType tableType) {
+    TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.INDEX_TYPE, indexType)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
 
     execInsertSql(tableEnv, TestSQL.INSERT_T1);
 
     // overwrite partition 'par1' and increase in age by 1
-    final String insertInto2 = "insert overwrite t1 
partition(`partition`='par1') values\n"
+    final String insertInto1 = "insert overwrite t1 
partition(`partition`='par1') values\n"
         + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n"
         + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n";
 
-    execInsertSql(tableEnv, insertInto2);
+    execInsertSql(tableEnv, insertInto1);
 
     List<Row> result1 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
 
+    // execute the same statement again and check the result
+    execInsertSql(tableEnv, insertInto1);
+
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result2, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
     // overwrite the whole table
     final String insertInto3 = "insert overwrite t1 values\n"
         + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
@@ -652,12 +661,18 @@ public class ITTestHoodieDataSource extends 
AbstractTestBase {
 
     execInsertSql(tableEnv, insertInto3);
 
-    List<Row> result2 = CollectionUtil.iterableToList(
+    List<Row> result3 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     final String expected = "["
         + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], "
         + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
-    assertRowsEquals(result2, expected);
+    assertRowsEquals(result3, expected);
+
+    // execute the same statement again and check the result
+    execInsertSql(tableEnv, insertInto3);
+    List<Row> result4 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result4, expected);
   }
 
   @ParameterizedTest
@@ -1529,6 +1544,19 @@ public class ITTestHoodieDataSource extends 
AbstractTestBase {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (index type, table type).
+   */
+  private static Stream<Arguments> indexAndTableTypeParams() {
+    Object[][] data =
+        new Object[][] {
+            {"FLINK_STATE", HoodieTableType.COPY_ON_WRITE},
+            {"FLINK_STATE", HoodieTableType.MERGE_ON_READ},
+            {"BUCKET", HoodieTableType.COPY_ON_WRITE},
+            {"BUCKET", HoodieTableType.MERGE_ON_READ}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private void execInsertSql(TableEnvironment tEnv, String insert) {
     TableResult tableResult = tEnv.executeSql(insert);
     // wait to finish

Reply via email to