This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 12ff562 [HUDI-1678] Row level delete for Flink sink (#2659)
12ff562 is described below
commit 12ff562d2bd77b6dc8676eac79cbd631185dac3c
Author: Danny Chan <[email protected]>
AuthorDate: Thu Mar 11 19:44:06 2021 +0800
[HUDI-1678] Row level delete for Flink sink (#2659)
---
.../transform/RowDataToHoodieFunction.java | 7 +-
.../apache/hudi/operator/TestWriteCopyOnWrite.java | 74 +++++++++++--
.../org/apache/hudi/operator/utils/TestData.java | 117 +++++++++++++--------
.../apache/hudi/source/HoodieDataSourceITCase.java | 10 +-
.../apache/hudi/source/TestHoodieTableSource.java | 2 +-
.../source/TestStreamReadMonitoringFunction.java | 12 +--
.../apache/hudi/source/TestStreamReadOperator.java | 16 +--
.../apache/hudi/source/format/TestInputFormat.java | 14 +--
8 files changed, 169 insertions(+), 83 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
index 2d47c79..611a277 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
@@ -19,6 +19,7 @@
package org.apache.hudi.operator.transform;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
import java.io.IOException;
@@ -100,9 +102,12 @@ public class RowDataToHoodieFunction<I extends RowData, O
extends HoodieRecord<?
final String payloadClazz =
this.config.getString(FlinkOptions.PAYLOAD_CLASS);
Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
+ final HoodieKey hoodieKey = keyGenerator.getKey(gr);
+ // nullify the payload insert data to mark the record as a DELETE
+ gr = record.getRowKind() == RowKind.DELETE ? null : gr;
HoodieRecordPayload payload = shouldCombine
? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
: StreamerUtil.createPayload(payloadClazz, gr);
- return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+ return new HoodieRecord<>(hoodieKey, payload);
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
index e9c678b..b8c9b3c 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/operator/TestWriteCopyOnWrite.java
@@ -114,7 +114,7 @@ public class TestWriteCopyOnWrite {
public void testCheckpoint() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_ONE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
@@ -200,7 +200,7 @@ public class TestWriteCopyOnWrite {
checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.REQUESTED, null);
checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, null);
- for (RowData rowData : TestData.DATA_SET_ONE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
@@ -215,7 +215,7 @@ public class TestWriteCopyOnWrite {
public void testInsert() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_ONE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
@@ -248,7 +248,7 @@ public class TestWriteCopyOnWrite {
// open the function and ingest data
funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_THREE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
@@ -267,7 +267,7 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, EXPECTED3, 1);
// insert duplicates again
- for (RowData rowData : TestData.DATA_SET_THREE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
@@ -284,7 +284,7 @@ public class TestWriteCopyOnWrite {
public void testUpsert() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_ONE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
@@ -301,7 +301,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointComplete(1);
// upsert another data buffer
- for (RowData rowData : TestData.DATA_SET_TWO) {
+ for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData);
}
// the data is not flushed yet
@@ -326,6 +326,58 @@ public class TestWriteCopyOnWrite {
}
@Test
+ public void testUpsertWithDelete() throws Exception {
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ for (RowData rowData : TestData.DATA_SET_INSERT) {
+ funcWrapper.invoke(rowData);
+ }
+
+ assertEmptyDataFiles();
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+
+ OperatorEvent nextEvent = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent,
instanceOf(BatchWriteSuccessEvent.class));
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
+
+ funcWrapper.checkpointComplete(1);
+
+ // upsert another data buffer
+ for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) {
+ funcWrapper.invoke(rowData);
+ }
+ // the data is not flushed yet
+ checkWrittenData(tempFile, EXPECTED1);
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(2);
+
+ String instant = funcWrapper.getWriteClient()
+ .getInflightAndRequestedInstant(getTableType());
+
+ nextEvent = funcWrapper.getNextEvent();
+ assertThat("The operator expect to send an event", nextEvent,
instanceOf(BatchWriteSuccessEvent.class));
+
+ funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
+
+ checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.REQUESTED, instant);
+ funcWrapper.checkpointComplete(2);
+ // the coordinator checkpoint commits the inflight instant.
+ checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, instant);
+
+ Map<String, String> expected = new HashMap<>();
+ // id3, id5 were deleted and id9 is ignored
+ expected.put("par1", "[id1,par1,id1,Danny,24,1,par1,
id2,par1,id2,Stephen,34,2,par1]");
+ expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+ expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7,par4,
id8,par4,id8,Han,56,8,par4]");
+ checkWrittenData(tempFile, expected);
+ }
+
+ @Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
@@ -334,7 +386,7 @@ public class TestWriteCopyOnWrite {
// open the function and ingest data
funcWrapper.openFunction();
// Each record is 424 bytes. so 3 records expect to trigger a mini-batch
write
- for (RowData rowData : TestData.DATA_SET_THREE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
@@ -369,7 +421,7 @@ public class TestWriteCopyOnWrite {
checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, instant);
// insert duplicates again
- for (RowData rowData : TestData.DATA_SET_THREE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
@@ -401,7 +453,7 @@ public class TestWriteCopyOnWrite {
public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
- for (RowData rowData : TestData.DATA_SET_ONE) {
+ for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
@@ -421,7 +473,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.clearIndexState();
// upsert another data buffer
- for (RowData rowData : TestData.DATA_SET_TWO) {
+ for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData);
}
checkIndexLoaded(
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index efeb0dd..3b63feb 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -44,6 +44,7 @@ import
org.apache.flink.table.runtime.types.InternalSerializers;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Strings;
@@ -58,6 +59,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -70,100 +72,116 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Data set for testing, also some utilities to check the results. */
public class TestData {
- public static List<RowData> DATA_SET_ONE = Arrays.asList(
- binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ public static List<RowData> DATA_SET_INSERT = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33,
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
31,
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
31,
TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
+ insertRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
);
- public static List<RowData> DATA_SET_TWO = Arrays.asList(
+ public static List<RowData> DATA_SET_UPDATE_INSERT = Arrays.asList(
// advance the age by 1
- binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"),
54,
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"),
54,
TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
32,
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
32,
TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
// same with before
- binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
// new data
- binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"),
19,
+ insertRow(StringData.fromString("id9"), StringData.fromString("Jane"),
19,
TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"),
38,
+ insertRow(StringData.fromString("id10"), StringData.fromString("Ella"),
38,
TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id11"),
StringData.fromString("Phoebe"), 52,
+ insertRow(StringData.fromString("id11"),
StringData.fromString("Phoebe"), 52,
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
);
- public static List<RowData> DATA_SET_THREE = new ArrayList<>();
+ public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
static {
- IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add(
- binaryRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
+ insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
}
// data set of test_source.data
- public static List<RowData> DATA_SET_FOUR = Arrays.asList(
- binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33,
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33,
TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
31,
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
31,
TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
+ insertRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
// merged data set of test_source.data and test_source2.data
- public static List<RowData> DATA_SET_FIVE = Arrays.asList(
- binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
+ public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
- binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"),
54,
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"),
54,
TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
32,
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
32,
TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
- binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
+ insertRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"),
19,
+ insertRow(StringData.fromString("id9"), StringData.fromString("Jane"),
19,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
- binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"),
38,
+ insertRow(StringData.fromString("id10"), StringData.fromString("Ella"),
38,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
- binaryRow(StringData.fromString("id11"),
StringData.fromString("Phoebe"), 52,
+ insertRow(StringData.fromString("id11"),
StringData.fromString("Phoebe"), 52,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
+ public static List<RowData> DATA_SET_UPDATE_DELETE = Arrays.asList(
+ // this is update
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ // this is delete
+ deleteRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+ deleteRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+ // delete a record that has no inserts
+ deleteRow(StringData.fromString("id9"), StringData.fromString("Jane"),
19,
+ TimestampData.fromEpochMillis(6), StringData.fromString("par3"))
+ );
+
/**
* Returns string format of a list of RowData.
*/
@@ -388,11 +406,16 @@ public class TestData {
List<String> readBuffer = scanner.getRecords().values().stream()
.map(hoodieRecord -> {
try {
- return filterOutVariables((GenericRecord)
hoodieRecord.getData().getInsertValue(schema, new Properties()).get());
+ // in case it is a delete
+ GenericRecord record = (GenericRecord) hoodieRecord.getData()
+ .getInsertValue(schema, new Properties())
+ .orElse(null);
+ return record == null ? (String) null :
filterOutVariables(record);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
+ .filter(Objects::nonNull)
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());
assertThat(readBuffer.toString(),
is(expected.get(partitionDir.getName())));
@@ -437,7 +460,7 @@ public class TestData {
return Strings.join(fields, ",");
}
- private static BinaryRowData binaryRow(Object... fields) {
+ private static BinaryRowData insertRow(Object... fields) {
LogicalType[] types =
TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
assertEquals(
@@ -458,4 +481,10 @@ public class TestData {
writer.complete();
return row;
}
+
+ private static BinaryRowData deleteRow(Object... fields) {
+ BinaryRowData rowData = insertRow(fields);
+ rowData.setRowKind(RowKind.DELETE);
+ return rowData;
+ }
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
index ca110db..1a4a71d 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
@@ -95,12 +95,12 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
execInsertSql(streamTableEnv, insertInto);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
- assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
- assertRowsEquals(rows2, TestData.DATA_SET_FOUR);
+ assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
}
@Test
@@ -135,7 +135,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
// all the data with same keys are appended within one data bucket and one
log file,
// so when consume, the same keys are merged
- assertRowsEquals(rows, TestData.DATA_SET_FIVE);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED);
}
@Test
@@ -156,7 +156,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
List<Row> rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
- assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@Test
@@ -182,7 +182,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
List<Row> rows = CollectionUtil.iterableToList(
() -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
- assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
private void execInsertSql(TableEnvironment tEnv, String insert) {
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
index af50cf0..48bd350 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
@@ -101,7 +101,7 @@ public class TestHoodieTableSource {
@Test
void testGetInputFormat() throws Exception {
// write some data to let the TableSchemaResolver get the right instant
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
HoodieTableSource tableSource = new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA,
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index f02a28c..4733fa0 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -71,7 +71,7 @@ public class TestStreamReadMonitoringFunction {
@Test
public void testConsumeFromLatestCommit() throws Exception {
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
harness.setup();
@@ -95,7 +95,7 @@ public class TestStreamReadMonitoringFunction {
sourceContext.reset(latch);
// write another instant and validate
- TestData.writeData(TestData.DATA_SET_TWO, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
assertThat("Should produce the expected splits",
@@ -112,8 +112,8 @@ public class TestStreamReadMonitoringFunction {
public void testConsumeFromSpecifiedCommit() throws Exception {
// write 2 commits first, use the second commit time as the specified
start instant,
// all the splits should come from the second commit.
- TestData.writeData(TestData.DATA_SET_ONE, conf);
- TestData.writeData(TestData.DATA_SET_TWO, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
String specifiedCommit =
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
@@ -141,7 +141,7 @@ public class TestStreamReadMonitoringFunction {
@Test
public void testCheckpointRestore() throws Exception {
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
OperatorSubtaskState state;
@@ -169,7 +169,7 @@ public class TestStreamReadMonitoringFunction {
}
- TestData.writeData(TestData.DATA_SET_TWO, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function2)) {
harness.setup();
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index e13f950..2daa6c3 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -93,7 +93,7 @@ public class TestStreamReadOperator {
@Test
void testWriteRecords() throws Exception {
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData>
harness = createReader()) {
harness.setup();
harness.open();
@@ -111,9 +111,9 @@ public class TestStreamReadOperator {
assertThat("Should process 1 split", processor.runMailboxStep());
}
// Assert the output has expected elements.
- TestData.assertRowDataEquals(harness.extractOutputValues(),
TestData.DATA_SET_ONE);
+ TestData.assertRowDataEquals(harness.extractOutputValues(),
TestData.DATA_SET_INSERT);
- TestData.writeData(TestData.DATA_SET_TWO, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
assertThat("Should have 4 splits", splits2.size(), is(4));
for (MergeOnReadInputSplit split : splits2) {
@@ -124,8 +124,8 @@ public class TestStreamReadOperator {
assertThat("Should processed 1 split", processor.runMailboxStep());
}
// The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
- List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
- expected.addAll(TestData.DATA_SET_TWO);
+ List<RowData> expected = new ArrayList<>(TestData.DATA_SET_INSERT);
+ expected.addAll(TestData.DATA_SET_UPDATE_INSERT);
TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
}
}
@@ -134,7 +134,7 @@ public class TestStreamReadOperator {
public void testCheckpoint() throws Exception {
// Received emitted splits: split1, split2, split3, split4, checkpoint
request is triggered
// when reading records from split1.
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
long timestamp = 0;
try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData>
harness = createReader()) {
harness.setup();
@@ -170,13 +170,13 @@ public class TestStreamReadOperator {
assertTrue(processor.runMailboxStep(), "Should have processed the
split3");
// Assert the output has expected elements.
- TestData.assertRowDataEquals(harness.extractOutputValues(),
TestData.DATA_SET_ONE);
+ TestData.assertRowDataEquals(harness.extractOutputValues(),
TestData.DATA_SET_INSERT);
}
}
@Test
public void testCheckpointRestore() throws Exception {
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
OperatorSubtaskState state;
final List<MergeOnReadInputSplit> splits;
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
index 7774e56..343f293 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
@@ -76,18 +76,18 @@ public class TestInputFormat {
void testRead(HoodieTableType tableType) throws Exception {
beforeEach(tableType);
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
List<RowData> result = readData(inputFormat);
String actual = TestData.rowDataToString(result);
- String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
+ String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
assertThat(actual, is(expected));
// write another commit to read again
- TestData.writeData(TestData.DATA_SET_TWO, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
// refresh the input format
this.tableSource.reloadActiveTimeline();
@@ -116,19 +116,19 @@ public class TestInputFormat {
// write parquet first with compaction
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
List<RowData> result = readData(inputFormat);
String actual = TestData.rowDataToString(result);
- String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
+ String expected = TestData.rowDataToString(TestData.DATA_SET_INSERT);
assertThat(actual, is(expected));
// write another commit using logs and read again
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
- TestData.writeData(TestData.DATA_SET_TWO, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
// refresh the input format
this.tableSource.reloadActiveTimeline();
@@ -156,7 +156,7 @@ public class TestInputFormat {
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
beforeEach(tableType);
- TestData.writeData(TestData.DATA_SET_ONE, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
Map<String, String> prunedPartitions = new HashMap<>();
prunedPartitions.put("partition", "par1");