This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 12ff562  [HUDI-1678] Row level delete for Flink sink (#2659)
12ff562 is described below

commit 12ff562d2bd77b6dc8676eac79cbd631185dac3c
Author: Danny Chan <[email protected]>
AuthorDate: Thu Mar 11 19:44:06 2021 +0800

    [HUDI-1678] Row level delete for Flink sink (#2659)
---
 .../transform/RowDataToHoodieFunction.java         |   7 +-
 .../apache/hudi/operator/TestWriteCopyOnWrite.java |  74 +++++++++++--
 .../org/apache/hudi/operator/utils/TestData.java   | 117 +++++++++++++--------
 .../apache/hudi/source/HoodieDataSourceITCase.java |  10 +-
 .../apache/hudi/source/TestHoodieTableSource.java  |   2 +-
 .../source/TestStreamReadMonitoringFunction.java   |  12 +--
 .../apache/hudi/source/TestStreamReadOperator.java |  16 +--
 .../apache/hudi/source/format/TestInputFormat.java |  14 +--
 8 files changed, 169 insertions(+), 83 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
index 2d47c79..611a277 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.operator.transform;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 
@@ -100,9 +102,12 @@ public class RowDataToHoodieFunction<I extends RowData, O 
extends HoodieRecord<?
     final String payloadClazz = 
this.config.getString(FlinkOptions.PAYLOAD_CLASS);
     Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
         this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
+    final HoodieKey hoodieKey = keyGenerator.getKey(gr);
+    // nullify the payload insert data to mark the record as a DELETE
+    gr = record.getRowKind() == RowKind.DELETE ? null : gr;
     HoodieRecordPayload payload = shouldCombine
         ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
         : StreamerUtil.createPayload(payloadClazz, gr);
-    return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+    return new HoodieRecord<>(hoodieKey, payload);
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
index e9c678b..b8c9b3c 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
@@ -114,7 +114,7 @@ public class TestWriteCopyOnWrite {
   public void testCheckpoint() throws Exception {
     // open the function and ingest data
     funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_ONE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
       funcWrapper.invoke(rowData);
     }
 
@@ -200,7 +200,7 @@ public class TestWriteCopyOnWrite {
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.REQUESTED, null);
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, null);
 
-    for (RowData rowData : TestData.DATA_SET_ONE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
       funcWrapper.invoke(rowData);
     }
 
@@ -215,7 +215,7 @@ public class TestWriteCopyOnWrite {
   public void testInsert() throws Exception {
     // open the function and ingest data
     funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_ONE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
       funcWrapper.invoke(rowData);
     }
 
@@ -248,7 +248,7 @@ public class TestWriteCopyOnWrite {
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_THREE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
     }
 
@@ -267,7 +267,7 @@ public class TestWriteCopyOnWrite {
     checkWrittenData(tempFile, EXPECTED3, 1);
 
     // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_THREE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
     }
 
@@ -284,7 +284,7 @@ public class TestWriteCopyOnWrite {
   public void testUpsert() throws Exception {
     // open the function and ingest data
     funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_ONE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
       funcWrapper.invoke(rowData);
     }
 
@@ -301,7 +301,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointComplete(1);
 
     // upsert another data buffer
-    for (RowData rowData : TestData.DATA_SET_TWO) {
+    for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
       funcWrapper.invoke(rowData);
     }
     // the data is not flushed yet
@@ -326,6 +326,58 @@ public class TestWriteCopyOnWrite {
   }
 
   @Test
+  public void testUpsertWithDelete() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    funcWrapper.checkpointComplete(1);
+
+    // upsert another data buffer
+    for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) {
+      funcWrapper.invoke(rowData);
+    }
+    // the data is not flushed yet
+    checkWrittenData(tempFile, EXPECTED1);
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(2);
+
+    String instant = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant(getTableType());
+
+    nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.REQUESTED, instant);
+    funcWrapper.checkpointComplete(2);
+    // the coordinator checkpoint commits the inflight instant.
+    checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, instant);
+
+    Map<String, String> expected = new HashMap<>();
+    // id3, id5 were deleted and id9 is ignored
+    expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, 
id2,par1,id2,Stephen,34,2,par1]");
+    expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+    expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, 
id8,par4,id8,Han,56,8,par4]");
+    checkWrittenData(tempFile, expected);
+  }
+
+  @Test
   public void testInsertWithMiniBatches() throws Exception {
     // reset the config option
     conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
@@ -334,7 +386,7 @@ public class TestWriteCopyOnWrite {
     // open the function and ingest data
     funcWrapper.openFunction();
     // Each record is 424 bytes. so 3 records expect to trigger a mini-batch 
write
-    for (RowData rowData : TestData.DATA_SET_THREE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
     }
 
@@ -369,7 +421,7 @@ public class TestWriteCopyOnWrite {
     checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, instant);
 
     // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_THREE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
     }
 
@@ -401,7 +453,7 @@ public class TestWriteCopyOnWrite {
   public void testIndexStateBootstrap() throws Exception {
     // open the function and ingest data
     funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_ONE) {
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
       funcWrapper.invoke(rowData);
     }
 
@@ -421,7 +473,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.clearIndexState();
 
     // upsert another data buffer
-    for (RowData rowData : TestData.DATA_SET_TWO) {
+    for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
       funcWrapper.invoke(rowData);
     }
     checkIndexLoaded(
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index efeb0dd..3b63feb 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.table.runtime.types.InternalSerializers;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Strings;
@@ -58,6 +59,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -70,100 +72,116 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Data set for testing, also some utilities to check the results. */
 public class TestData {
-  public static List<RowData> DATA_SET_ONE = Arrays.asList(
-      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+  public static List<RowData> DATA_SET_INSERT = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
           TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
           TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
+      insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
           TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
           TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+      insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
           TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
           TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
-      binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+      insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
-  public static List<RowData> DATA_SET_TWO = Arrays.asList(
+  public static List<RowData> DATA_SET_UPDATE_INSERT = Arrays.asList(
       // advance the age by 1
-      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
           TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
54,
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
54,
           TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
32,
+      insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
32,
           TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
       // same with before
-      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
           TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
       // new data
-      binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
+      insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
           TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 
38,
+      insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 
38,
           TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
-      binaryRow(StringData.fromString("id11"), 
StringData.fromString("Phoebe"), 52,
+      insertRow(StringData.fromString("id11"), 
StringData.fromString("Phoebe"), 52,
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
-  public static List<RowData> DATA_SET_THREE = new ArrayList<>();
+  public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
   static {
-    IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add(
-        binaryRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+    IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
+        insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
             TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
   }
 
   // data set of test_source.data
-  public static List<RowData> DATA_SET_FOUR = Arrays.asList(
-      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+  public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
           TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
           TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
+      insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
           TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
           TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+      insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
           TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
           TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
-      binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+      insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
   );
 
   // merged data set of test_source.data and test_source2.data
-  public static List<RowData> DATA_SET_FIVE = Arrays.asList(
-      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
+  public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
           TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
           TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
-      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
54,
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
54,
           TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
32,
+      insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
32,
           TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
-      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
           TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+      insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
           TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
           TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
-      binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+      insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4")),
-      binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
+      insertRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
           TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
-      binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 
38,
+      insertRow(StringData.fromString("id10"), StringData.fromString("Ella"), 
38,
           TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
-      binaryRow(StringData.fromString("id11"), 
StringData.fromString("Phoebe"), 52,
+      insertRow(StringData.fromString("id11"), 
StringData.fromString("Phoebe"), 52,
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
   );
 
+  public static List<RowData> DATA_SET_UPDATE_DELETE = Arrays.asList(
+      // this is update
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      // this is delete
+      deleteRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+      deleteRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+      // delete a record that has no inserts
+      deleteRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
+          TimestampData.fromEpochMillis(6), StringData.fromString("par3"))
+  );
+
   /**
    * Returns string format of a list of RowData.
    */
@@ -388,11 +406,16 @@ public class TestData {
       List<String> readBuffer = scanner.getRecords().values().stream()
           .map(hoodieRecord -> {
             try {
-              return filterOutVariables((GenericRecord) 
hoodieRecord.getData().getInsertValue(schema, new Properties()).get());
+              // in case it is a delete
+              GenericRecord record = (GenericRecord) hoodieRecord.getData()
+                  .getInsertValue(schema, new Properties())
+                  .orElse(null);
+              return record == null ? (String) null : 
filterOutVariables(record);
             } catch (IOException e) {
               throw new RuntimeException(e);
             }
           })
+          .filter(Objects::nonNull)
           .sorted(Comparator.naturalOrder())
           .collect(Collectors.toList());
       assertThat(readBuffer.toString(), 
is(expected.get(partitionDir.getName())));
@@ -437,7 +460,7 @@ public class TestData {
     return Strings.join(fields, ",");
   }
 
-  private static BinaryRowData binaryRow(Object... fields) {
+  private static BinaryRowData insertRow(Object... fields) {
     LogicalType[] types = 
TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
         .toArray(LogicalType[]::new);
     assertEquals(
@@ -458,4 +481,10 @@ public class TestData {
     writer.complete();
     return row;
   }
+
+  private static BinaryRowData deleteRow(Object... fields) {
+    BinaryRowData rowData = insertRow(fields);
+    rowData.setRowKind(RowKind.DELETE);
+    return rowData;
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
index ca110db..1a4a71d 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
@@ -95,12 +95,12 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     execInsertSql(streamTableEnv, insertInto);
 
     List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
-    assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
 
     // insert another batch of data
     execInsertSql(streamTableEnv, insertInto);
     List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
-    assertRowsEquals(rows2, TestData.DATA_SET_FOUR);
+    assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
   }
 
   @Test
@@ -135,7 +135,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
     // all the data with same keys are appended within one data bucket and one 
log file,
     // so when consume, the same keys are merged
-    assertRowsEquals(rows, TestData.DATA_SET_FIVE);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED);
   }
 
   @Test
@@ -156,7 +156,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
 
     List<Row> rows = CollectionUtil.iterableToList(
         () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
   @Test
@@ -182,7 +182,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
 
     List<Row> rows = CollectionUtil.iterableToList(
         () -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
-    assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
   private void execInsertSql(TableEnvironment tEnv, String insert) {
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
index af50cf0..48bd350 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
@@ -101,7 +101,7 @@ public class TestHoodieTableSource {
   @Test
   void testGetInputFormat() throws Exception {
     // write some data to let the TableSchemaResolver get the right instant
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     HoodieTableSource tableSource = new HoodieTableSource(
         TestConfigurations.TABLE_SCHEMA,
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index f02a28c..4733fa0 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -71,7 +71,7 @@ public class TestStreamReadMonitoringFunction {
 
   @Test
   public void testConsumeFromLatestCommit() throws Exception {
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
       harness.setup();
@@ -95,7 +95,7 @@ public class TestStreamReadMonitoringFunction {
       sourceContext.reset(latch);
 
       // write another instant and validate
-      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
 
       assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
       assertThat("Should produce the expected splits",
@@ -112,8 +112,8 @@ public class TestStreamReadMonitoringFunction {
   public void testConsumeFromSpecifiedCommit() throws Exception {
     // write 2 commits first, use the second commit time as the specified 
start instant,
     // all the splits should come from the second commit.
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
-    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
     String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit);
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
@@ -141,7 +141,7 @@ public class TestStreamReadMonitoringFunction {
 
   @Test
   public void testCheckpointRestore() throws Exception {
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
     OperatorSubtaskState state;
@@ -169,7 +169,7 @@ public class TestStreamReadMonitoringFunction {
 
     }
 
-    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
     StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function2)) {
       harness.setup();
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index e13f950..2daa6c3 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -93,7 +93,7 @@ public class TestStreamReadOperator {
 
   @Test
   void testWriteRecords() throws Exception {
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
     try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
       harness.setup();
       harness.open();
@@ -111,9 +111,9 @@ public class TestStreamReadOperator {
         assertThat("Should process 1 split", processor.runMailboxStep());
       }
       // Assert the output has expected elements.
-      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_INSERT);
 
-      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
       final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
       assertThat("Should have 4 splits", splits2.size(), is(4));
       for (MergeOnReadInputSplit split : splits2) {
@@ -124,8 +124,8 @@ public class TestStreamReadOperator {
         assertThat("Should processed 1 split", processor.runMailboxStep());
       }
       // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
-      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
-      expected.addAll(TestData.DATA_SET_TWO);
+      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_INSERT);
+      expected.addAll(TestData.DATA_SET_UPDATE_INSERT);
       TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
     }
   }
@@ -134,7 +134,7 @@ public class TestStreamReadOperator {
   public void testCheckpoint() throws Exception {
     // Received emitted splits: split1, split2, split3, split4, checkpoint 
request is triggered
     // when reading records from split1.
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
     long timestamp = 0;
     try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
       harness.setup();
@@ -170,13 +170,13 @@ public class TestStreamReadOperator {
       assertTrue(processor.runMailboxStep(), "Should have processed the 
split3");
 
       // Assert the output has expected elements.
-      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_INSERT);
     }
   }
 
   @Test
   public void testCheckpointRestore() throws Exception {
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     OperatorSubtaskState state;
     final List<MergeOnReadInputSplit> splits;
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
index 7774e56..343f293 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
@@ -76,18 +76,18 @@ public class TestInputFormat {
   void testRead(HoodieTableType tableType) throws Exception {
     beforeEach(tableType);
 
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
 
     List<RowData> result = readData(inputFormat);
 
     String actual = TestData.rowDataToString(result);
-    String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
+    String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
     assertThat(actual, is(expected));
 
     // write another commit to read again
-    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
 
     // refresh the input format
     this.tableSource.reloadActiveTimeline();
@@ -116,19 +116,19 @@ public class TestInputFormat {
 
     // write parquet first with compaction
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
 
     List<RowData> result = readData(inputFormat);
 
     String actual = TestData.rowDataToString(result);
-    String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
+    String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
     assertThat(actual, is(expected));
 
     // write another commit using logs and read again
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
-    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
 
     // refresh the input format
     this.tableSource.reloadActiveTimeline();
@@ -156,7 +156,7 @@ public class TestInputFormat {
   void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
     beforeEach(tableType);
 
-    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
     Map<String, String> prunedPartitions = new HashMap<>();
     prunedPartitions.put("partition", "par1");

Reply via email to