This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 19d1094a73b [HUDI-6195] Test-cover different payload classes when use
HoodieMergedReadHandle (#8675)
19d1094a73b is described below
commit 19d1094a73b845fcd2ac884dafb6c8444d161bbc
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri May 12 14:10:11 2023 +0800
[HUDI-6195] Test-cover different payload classes when use
HoodieMergedReadHandle (#8675)
---
.../apache/hudi/io/TestHoodieMergedReadHandle.java | 187 +++++++++--------
.../HoodieAdaptablePayloadDataGenerator.java | 228 +++++++++++++++++++++
.../hudi/common/testutils/RawTripTestPayload.java | 23 +++
.../src/test/resources/adaptable-payload.avsc | 82 ++++++++
4 files changed, 431 insertions(+), 89 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
index 552dcb08a6d..6a93e5499c4 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
@@ -20,14 +20,17 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
@@ -38,113 +41,139 @@ import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
-import java.util.Properties;
import java.util.stream.Collectors;
-
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro;
-import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
-import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_STR;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_WITH_METAFIELDS;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getDeletes;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getInserts;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getKeyGenProps;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getPayloadProps;
+import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getUpdates;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMergedReadHandle extends
SparkClientFunctionalTestHarness {
- private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ private static Stream<Arguments> avroPayloadClasses() {
+ return Stream.of(
+ Arguments.of(COPY_ON_WRITE, OverwriteWithLatestAvroPayload.class),
+ Arguments.of(COPY_ON_WRITE,
OverwriteNonDefaultsWithLatestAvroPayload.class),
+ Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+ Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+ Arguments.of(MERGE_ON_READ, OverwriteWithLatestAvroPayload.class),
+ Arguments.of(MERGE_ON_READ,
OverwriteNonDefaultsWithLatestAvroPayload.class),
+ Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+ Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
+ );
+ }
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testReadLatestRecordsWithDeletes(HoodieTableType tableType)
throws IOException {
- HoodieWriteConfig writeConfig = getWriteConfig();
+ @MethodSource("avroPayloadClasses")
+ public void testReadLatestRecordsWithDeletes(HoodieTableType tableType,
Class<?> payloadClass) throws IOException {
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass);
HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
final int totalRecords = 4;
+ final String partition = "foo";
+
// 1st batch: inserts
String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
- List<HoodieRecord> insertsAtEpoch0 = convertPayload(dataGen
- .generateInsertsForPartition(commitTimeAtEpoch0, totalRecords,
DEFAULT_FIRST_PARTITION_PATH), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, partition,
0, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch0);
client.upsert(jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch0,
DEFAULT_FIRST_PARTITION_PATH, 0);
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 0, payloadClass);
// 2nd batch: normal updates
String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
- List<HoodieRecord> updatesAtEpoch5 = convertPayload(dataGen
- .generateUpdatesWithTimestamp(commitTimeAtEpoch5, insertsAtEpoch0,
5), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch5);
- client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5,
DEFAULT_FIRST_PARTITION_PATH, 5);
+ JavaRDD<WriteStatus> writeStatusesAtEpoch5 =
client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
+ assertNoWriteErrors(writeStatusesAtEpoch5.collect());
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 5, payloadClass);
- // 3rd batch: delete a record
+ // 3rd batch: delete the record with id 3 (the last one)
String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
client.startCommitWithTime(commitTimeAtEpoch6);
- List<HoodieRecord> deletesAtEpoch6 = convertPayload(dataGen
- .generateUpdatesWithTimestamp(commitTimeAtEpoch6,
Collections.singletonList(insertsAtEpoch0.get(0)), 6),
DefaultHoodieRecordPayload.class, true);
- client.upsert(jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6);
- validate(metaClient, writeConfig, totalRecords - 1, commitTimeAtEpoch5,
DEFAULT_FIRST_PARTITION_PATH, 5);
+ List<HoodieRecord> deletesAtEpoch6 =
getDeletes(updatesAtEpoch5.subList(totalRecords - 1, totalRecords), 6,
payloadClass);
+ JavaRDD<WriteStatus> writeStatusesAtEpoch6 =
client.upsert(jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6);
+ assertNoWriteErrors(writeStatusesAtEpoch6.collect());
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords - 1,
partition, 5, payloadClass);
// 4th batch: normal updates
String commitTimeAtEpoch9 = getCommitTimeAtUTC(9);
- List<HoodieRecord> updatesAtEpoch9 = convertPayload(dataGen
- .generateUpdatesWithTimestamp(commitTimeAtEpoch9, insertsAtEpoch0,
9), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> updatesAtEpoch9 = getUpdates(updatesAtEpoch5, 9,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch9);
client.upsert(jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch9,
DEFAULT_FIRST_PARTITION_PATH, 9);
-
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 9, payloadClass);
}
}
+ private static Stream<Arguments> avroPayloadClassesThatHonorOrdering() {
+ return Stream.of(
+ Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+ Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+ Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+ Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
+ );
+ }
+
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testReadLatestRecordsWithLateArrivedRecords(HoodieTableType
tableType) throws IOException {
- HoodieWriteConfig writeConfig = getWriteConfig();
+ @MethodSource("avroPayloadClassesThatHonorOrdering")
+ public void testReadLatestRecordsWithLateArrivedRecords(HoodieTableType
tableType, Class<?> payloadClass) throws IOException {
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass);
HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
final int totalRecords = 4;
+ final String partition = "foo";
+
// 1st batch: inserts
String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
- List<HoodieRecord> insertsAtEpoch0 = convertPayload(dataGen
- .generateInsertsForPartition(commitTimeAtEpoch0, totalRecords,
DEFAULT_FIRST_PARTITION_PATH), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, partition,
0, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch0);
client.upsert(jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch0,
DEFAULT_FIRST_PARTITION_PATH, 0);
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 0, payloadClass);
// 2nd batch: normal updates
String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
- List<HoodieRecord> updatesAtEpoch5 = convertPayload(dataGen
- .generateUpdatesWithTimestamp(commitTimeAtEpoch5, insertsAtEpoch0,
5), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch5);
client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5,
DEFAULT_FIRST_PARTITION_PATH, 5);
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 5, payloadClass);
// 3rd batch: updates with old timestamp will be discarded
String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
- List<HoodieRecord> updatesAtEpoch1 = convertPayload(dataGen
- .generateUpdatesWithTimestamp(commitTimeAtEpoch6, insertsAtEpoch0,
1), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> updatesAtEpoch1 = getUpdates(insertsAtEpoch0, 1,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch6);
client.upsert(jsc().parallelize(updatesAtEpoch1, 1), commitTimeAtEpoch6);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5,
DEFAULT_FIRST_PARTITION_PATH, 5);
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 5, payloadClass);
// 4th batch: normal updates
String commitTimeAtEpoch9 = getCommitTimeAtUTC(9);
- List<HoodieRecord> updatesAtEpoch9 = convertPayload(dataGen
- .generateUpdatesWithTimestamp(commitTimeAtEpoch9, insertsAtEpoch0,
9), DefaultHoodieRecordPayload.class);
+ List<HoodieRecord> updatesAtEpoch9 = getUpdates(updatesAtEpoch5, 9,
payloadClass);
client.startCommitWithTime(commitTimeAtEpoch9);
client.upsert(jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9);
- validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch9,
DEFAULT_FIRST_PARTITION_PATH, 9);
+ doMergedReadAndValidate(metaClient, writeConfig, totalRecords,
partition, 9, payloadClass);
}
}
- private void validate(HoodieTableMetaClient metaClient, HoodieWriteConfig
writeConfig,
- int totalRecords, String commitTime, String partition, long timestamp)
throws IOException {
+ private void doMergedReadAndValidate(HoodieTableMetaClient metaClient,
HoodieWriteConfig writeConfig,
+ int totalRecords, String partition, long timestamp, Class<?>
payloadClass) throws IOException {
+ String orderingField = new
HoodieAdaptablePayloadDataGenerator.RecordGen(payloadClass).getOrderingField();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(writeConfig, context(),
metaClient);
List<Pair<String, String>> partitionPathAndFileIDPairs =
table.getHoodieView()
@@ -156,58 +185,38 @@ public class TestHoodieMergedReadHandle extends
SparkClientFunctionalTestHarness
HoodieMergedReadHandle mergedReadHandle = new
HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table,
partitionPathAndFileIDPairs.get(0));
List<HoodieRecord> mergedRecords = mergedReadHandle.getMergedRecords();
assertEquals(totalRecords, mergedRecords.size());
- for (HoodieRecord rec : mergedRecords) {
- HoodieRecord r = rec.wrapIntoHoodieRecordPayloadWithParams(
- addMetadataFields(AVRO_SCHEMA,
writeConfig.allowOperationMetadataField()),
- writeConfig.getProps(), Option.empty(),
writeConfig.allowOperationMetadataField(), Option.empty(), false,
Option.of(AVRO_SCHEMA));
+ for (int i = 0; i < mergedRecords.size(); i++) {
+ HoodieRecord r = mergedRecords.get(i);
+ mergedRecords.set(i, r.wrapIntoHoodieRecordPayloadWithParams(
+ SCHEMA_WITH_METAFIELDS, writeConfig.getProps(), Option.empty(),
writeConfig.allowOperationMetadataField(),
+ Option.empty(), false, Option.of(SCHEMA)));
+ }
+ List<HoodieRecord> sortedMergedRecords = mergedRecords.stream()
+
.sorted(Comparator.comparing(HoodieRecord::getRecordKey)).collect(Collectors.toList());
+ for (int i = 0; i < sortedMergedRecords.size(); i++) {
+ HoodieRecord r = sortedMergedRecords.get(i);
+ assertEquals(i, Integer.parseInt(r.getRecordKey()));
assertEquals(partition, r.getPartitionPath());
- assertEquals(DefaultHoodieRecordPayload.class, r.getData().getClass());
- DefaultHoodieRecordPayload data = (DefaultHoodieRecordPayload)
r.getData();
- assertEquals(timestamp, data.getOrderingVal());
- Option<IndexedRecord> valueOpt = data.getInsertValue(AVRO_SCHEMA);
+ assertEquals(payloadClass.getName(), r.getData().getClass().getName());
+ Option<IndexedRecord> valueOpt = ((HoodieRecordPayload)
r.getData()).getInsertValue(SCHEMA);
assertTrue(valueOpt.isPresent());
GenericRecord avroValue = (GenericRecord) valueOpt.get();
- assertEquals("rider-" + commitTime, avroValue.get("rider").toString());
- assertEquals("driver-" + commitTime, avroValue.get("driver").toString());
- assertEquals(timestamp,
Long.parseLong(avroValue.get("timestamp").toString()));
- }
- }
-
- private List<HoodieRecord> convertPayload(List<HoodieRecord> records,
Class<?> payloadClazz) throws IOException {
- return convertPayload(records, payloadClazz, false);
- }
-
- private List<HoodieRecord> convertPayload(List<HoodieRecord> records,
Class<?> payloadClazz, boolean isDeleted) throws IOException {
- List<HoodieRecord> convertedRecords = new ArrayList<>();
- for (HoodieRecord r : records) {
- GenericRecord avroData = (GenericRecord) ((RawTripTestPayload)
r.getData()).getRecordToInsert(AVRO_SCHEMA);
- avroData.put("_hoodie_is_deleted", isDeleted);
- convertedRecords.add(
- createHoodieRecordFromAvro(avroData, payloadClazz.getName(),
- "timestamp", Option.of(Pair.of("_row_key", "partition_path")),
- false, Option.empty(), false, Option.of(AVRO_SCHEMA)));
+ assertEquals(i, Integer.parseInt(avroValue.get("id").toString()));
+ assertEquals(partition, avroValue.get("pt").toString());
+ assertEquals(timestamp,
Long.parseLong(avroValue.get(orderingField).toString()));
}
- return convertedRecords;
}
- private HoodieWriteConfig getWriteConfig() {
- Properties properties = new Properties();
- properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
- properties.put("hoodie.datasource.write.partitionpath.field",
"partition_path");
- properties.put("hoodie.datasource.write.precombine.field", "timestamp");
- properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
- properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
- properties.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "timestamp");
+ private HoodieWriteConfig getWriteConfig(Class<?> payloadClass) {
return getConfigBuilder(true)
- .withProperties(properties)
+ .withProperties(getKeyGenProps(payloadClass))
.withParallelism(2, 2)
.withBulkInsertParallelism(2)
.withDeleteParallelism(1)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
- .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withSchema(SCHEMA_STR)
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
- .withPayloadClass(DefaultHoodieRecordPayload.class.getName())
- .withPayloadOrderingField("timestamp").build())
+ .fromProperties(getPayloadProps(payloadClass)).build())
.build();
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieAdaptablePayloadDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieAdaptablePayloadDataGenerator.java
new file mode 100644
index 00000000000..e403a9a3681
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieAdaptablePayloadDataGenerator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+public class HoodieAdaptablePayloadDataGenerator {
+
+ public static final Schema SCHEMA =
SchemaTestUtil.getSchemaFromResource(HoodieAdaptablePayloadDataGenerator.class,
"/adaptable-payload.avsc");
+ public static final Schema SCHEMA_WITH_METAFIELDS =
HoodieAvroUtils.addMetadataFields(SCHEMA, false);
+ public static final String SCHEMA_STR = SCHEMA.toString();
+
+ public static Properties getKeyGenProps(Class<?> payloadClass) {
+ String orderingField = new RecordGen(payloadClass).getOrderingField();
+ Properties props = new Properties();
+ props.put("hoodie.datasource.write.recordkey.field", "id");
+ props.put("hoodie.datasource.write.partitionpath.field", "pt");
+ props.put("hoodie.datasource.write.precombine.field", orderingField);
+ props.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "id");
+ props.put(HoodieTableConfig.PARTITION_FIELDS.key(), "pt");
+ props.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), orderingField);
+ return props;
+ }
+
+ public static Properties getPayloadProps(Class<?> payloadClass) {
+ String orderingField = new RecordGen(payloadClass).getOrderingField();
+ Properties props = new Properties();
+ props.put("hoodie.compaction.payload.class", payloadClass.getName());
+ props.put("hoodie.payload.event.time.field", orderingField);
+ props.put("hoodie.payload.ordering.field", orderingField);
+ return props;
+ }
+
+ public static List<HoodieRecord> getInserts(int n, String partition, long
ts, Class<?> payloadClass) throws IOException {
+ return getInserts(n, new String[] {partition}, ts, payloadClass);
+ }
+
+ public static List<HoodieRecord> getInserts(int n, String[] partitions, long
ts, Class<?> payloadClass) throws IOException {
+ List<HoodieRecord> inserts = new ArrayList<>();
+ RecordGen recordGen = new RecordGen(payloadClass);
+ for (GenericRecord r : getInserts(n, partitions, ts, recordGen)) {
+ inserts.add(getHoodieRecord(r, recordGen.getPayloadClass()));
+ }
+ return inserts;
+ }
+
+ private static List<GenericRecord> getInserts(int n, String[] partitions,
long ts, RecordGen recordGen) {
+ return IntStream.range(0, n).mapToObj(id -> {
+ String pt = partitions.length == 0 ? "" : partitions[id %
partitions.length];
+ return getInsert(id, pt, ts, recordGen);
+ }).collect(Collectors.toList());
+ }
+
+ private static GenericRecord getInsert(int id, String pt, long ts, RecordGen
recordGen) {
+ GenericRecord r = new GenericData.Record(SCHEMA);
+ r.put("id", id);
+ r.put("pt", pt);
+ return recordGen.populateForInsert(r, ts);
+ }
+
+ public static List<HoodieRecord> getUpdates(List<HoodieRecord> baseRecords,
long ts, Class<?> payloadClass) throws IOException {
+ RecordGen recordGen = new RecordGen(payloadClass);
+ List<HoodieRecord> updates = new ArrayList<>();
+ Properties props = new Properties();
+ for (HoodieRecord r : baseRecords) {
+ GenericRecord gr = (GenericRecord) r.toIndexedRecord(SCHEMA,
props).get().getData();
+ GenericRecord updated =
getUpdate(Integer.parseInt(gr.get("id").toString()), gr.get("pt").toString(),
ts, recordGen);
+ updates.add(getHoodieRecord(updated, recordGen.getPayloadClass()));
+ }
+ return updates;
+ }
+
+ private static GenericRecord getUpdate(int id, String pt, long ts, RecordGen
recordGen) {
+ GenericRecord r = new GenericData.Record(SCHEMA);
+ r.put("id", id);
+ r.put("pt", pt);
+ return recordGen.populateForUpdate(r, ts);
+ }
+
+ public static List<HoodieRecord> getDeletes(List<HoodieRecord> baseRecords,
long ts, Class<?> payloadClass) throws IOException {
+ RecordGen recordGen = new RecordGen(payloadClass);
+ List<HoodieRecord> deletes = new ArrayList<>();
+ Properties props = new Properties();
+ for (HoodieRecord r : baseRecords) {
+ GenericRecord gr = (GenericRecord) r.toIndexedRecord(SCHEMA,
props).get().getData();
+ GenericRecord deleted =
getDelete(Integer.parseInt(gr.get("id").toString()), gr.get("pt").toString(),
ts, recordGen);
+ deletes.add(getHoodieRecord(deleted, recordGen.getPayloadClass()));
+ }
+ return deletes;
+ }
+
+ private static GenericRecord getDelete(int id, String pt, long ts, RecordGen
recordGen) {
+ GenericRecord r = new GenericData.Record(SCHEMA);
+ r.put("id", id);
+ r.put("pt", pt);
+ return recordGen.populateForDelete(r, ts);
+ }
+
+ private static HoodieRecord getHoodieRecord(GenericRecord r, Class<?>
payloadClass) throws IOException {
+ return new HoodieAvroIndexedRecord(r)
+ .prependMetaFields(
+ SCHEMA,
+ SCHEMA_WITH_METAFIELDS,
+ new
MetadataValues().setRecordKey(r.get("id").toString()).setPartitionPath(r.get("pt").toString()),
+ new Properties())
+ .wrapIntoHoodieRecordPayloadWithParams(
+ SCHEMA_WITH_METAFIELDS,
+ getPayloadProps(payloadClass),
+ Option.empty(),
+ false,
+ Option.empty(),
+ false,
+ Option.of(SCHEMA));
+ }
+
+ public static class RecordGen {
+
+ public static final Set<Class<?>> SUPPORTED_PAYLOAD_CLASSES = new
HashSet<>(Arrays.asList(
+ OverwriteWithLatestAvroPayload.class,
+ OverwriteNonDefaultsWithLatestAvroPayload.class,
+ PartialUpdateAvroPayload.class,
+ DefaultHoodieRecordPayload.class,
+ AWSDmsAvroPayload.class,
+ MySqlDebeziumAvroPayload.class,
+ PostgresDebeziumAvroPayload.class
+ ));
+
+ private final Class<?> payloadClass;
+ private final String orderingField;
+
+ public RecordGen(Class<?> payloadClass) {
+ checkArgument(SUPPORTED_PAYLOAD_CLASSES.contains(payloadClass));
+ this.payloadClass = payloadClass;
+ if (payloadClass == MySqlDebeziumAvroPayload.class) {
+ orderingField = "_event_seq";
+ } else if (payloadClass == PostgresDebeziumAvroPayload.class) {
+ orderingField = "_event_lsn";
+ } else {
+ orderingField = "ts";
+ }
+ }
+
+ public Class<?> getPayloadClass() {
+ return payloadClass;
+ }
+
+ public String getOrderingField() {
+ return orderingField;
+ }
+
+ GenericRecord populateForInsert(GenericRecord r, long ts) {
+ r.put(orderingField, ts);
+ if (payloadClass == AWSDmsAvroPayload.class) {
+ r.put(AWSDmsAvroPayload.OP_FIELD, "I");
+ }
+ return r;
+ }
+
+ GenericRecord populateForUpdate(GenericRecord r, long ts) {
+ r.put(orderingField, ts);
+ if (payloadClass == AWSDmsAvroPayload.class) {
+ r.put(AWSDmsAvroPayload.OP_FIELD, "U");
+ }
+ return r;
+ }
+
+ GenericRecord populateForDelete(GenericRecord r, long ts) {
+ r.put(orderingField, ts);
+ if (payloadClass == MySqlDebeziumAvroPayload.class) {
+ r.put(DebeziumConstants.FLATTENED_OP_COL_NAME,
DebeziumConstants.DELETE_OP);
+ } else if (payloadClass == PostgresDebeziumAvroPayload.class) {
+ r.put(DebeziumConstants.FLATTENED_OP_COL_NAME,
DebeziumConstants.DELETE_OP);
+ } else if (payloadClass == AWSDmsAvroPayload.class) {
+ r.put(AWSDmsAvroPayload.OP_FIELD, "D");
+ } else {
+ r.put(HOODIE_IS_DELETED_FIELD, true);
+ }
+ return r;
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index 0553f31f616..f9a67a13710 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -20,12 +20,14 @@
package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
@@ -39,6 +41,7 @@ import org.apache.avro.specific.SpecificDatumWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,6 +50,9 @@ import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
+import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+
/**
* Example row change event based on some example data used by testcases. The
data avro schema is
* src/test/resources/schema1.
@@ -158,6 +164,23 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
.collect(Collectors.toList());
}
+ public static List<HoodieRecord> asDefaultPayloadRecords(List<HoodieRecord>
records) throws IOException {
+ return asDefaultPayloadRecords(records, false);
+ }
+
+ public static List<HoodieRecord> asDefaultPayloadRecords(List<HoodieRecord>
records, boolean isDeleted) throws IOException {
+ List<HoodieRecord> convertedRecords = new ArrayList<>();
+ for (HoodieRecord r : records) {
+ GenericRecord avroData = (GenericRecord) ((RawTripTestPayload)
r.getData()).getRecordToInsert(AVRO_SCHEMA);
+ avroData.put("_hoodie_is_deleted", isDeleted);
+ convertedRecords.add(
+ createHoodieRecordFromAvro(avroData,
DefaultHoodieRecordPayload.class.getName(),
+ "timestamp", Option.of(Pair.of("_row_key", "partition_path")),
+ false, Option.empty(), false, Option.of(AVRO_SCHEMA)));
+ }
+ return convertedRecords;
+ }
+
public String getPartitionPath() {
return partitionPath;
}
diff --git a/hudi-common/src/test/resources/adaptable-payload.avsc
b/hudi-common/src/test/resources/adaptable-payload.avsc
new file mode 100644
index 00000000000..68e48e11afa
--- /dev/null
+++ b/hudi-common/src/test/resources/adaptable-payload.avsc
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+ "type": "record",
+ "name": "PayloadAdaptableRecord",
+ "namespace": "hoodie",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "pt",
+ "type": "string"
+ },
+ {
+ "name": "ts",
+ "type": [
+ "null",
+ "long"
+ ],
+ "default": null
+ },
+ {
+ "name": "Op",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "_change_operation_type",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "_hoodie_is_deleted",
+ "type": [
+ "null",
+ "boolean"
+ ],
+ "default": null
+ },
+ {
+ "name": "_event_seq",
+ "type": [
+ "null",
+ "long"
+ ],
+ "default": null
+ },
+ {
+ "name": "_event_lsn",
+ "type": [
+ "null",
+ "long"
+ ],
+ "default": null
+ }
+ ]
+}