This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d1094a73b [HUDI-6195] Test-cover different payload classes when use 
HoodieMergedReadHandle (#8675)
19d1094a73b is described below

commit 19d1094a73b845fcd2ac884dafb6c8444d161bbc
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri May 12 14:10:11 2023 +0800

    [HUDI-6195] Test-cover different payload classes when use 
HoodieMergedReadHandle (#8675)
---
 .../apache/hudi/io/TestHoodieMergedReadHandle.java | 187 +++++++++--------
 .../HoodieAdaptablePayloadDataGenerator.java       | 228 +++++++++++++++++++++
 .../hudi/common/testutils/RawTripTestPayload.java  |  23 +++
 .../src/test/resources/adaptable-payload.avsc      |  82 ++++++++
 4 files changed, 431 insertions(+), 89 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
index 552dcb08a6d..6a93e5499c4 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java
@@ -20,14 +20,17 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodiePayloadConfig;
@@ -38,113 +41,139 @@ import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Properties;
 import java.util.stream.Collectors;
-
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_STR;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_WITH_METAFIELDS;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getDeletes;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getInserts;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getKeyGenProps;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getPayloadProps;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getUpdates;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieMergedReadHandle extends 
SparkClientFunctionalTestHarness {
 
-  private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+  private static Stream<Arguments> avroPayloadClasses() {
+    return Stream.of(
+        Arguments.of(COPY_ON_WRITE, OverwriteWithLatestAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, 
OverwriteNonDefaultsWithLatestAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+        Arguments.of(MERGE_ON_READ, OverwriteWithLatestAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, 
OverwriteNonDefaultsWithLatestAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
+    );
+  }
 
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testReadLatestRecordsWithDeletes(HoodieTableType tableType) 
throws IOException {
-    HoodieWriteConfig writeConfig = getWriteConfig();
+  @MethodSource("avroPayloadClasses")
+  public void testReadLatestRecordsWithDeletes(HoodieTableType tableType, 
Class<?> payloadClass) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig(payloadClass);
     HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, 
writeConfig.getProps());
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
       final int totalRecords = 4;
+      final String partition = "foo";
+
       // 1st batch: inserts
       String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
-      List<HoodieRecord> insertsAtEpoch0 = convertPayload(dataGen
-          .generateInsertsForPartition(commitTimeAtEpoch0, totalRecords, 
DEFAULT_FIRST_PARTITION_PATH), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, partition, 
0, payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch0);
       client.upsert(jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch0, 
DEFAULT_FIRST_PARTITION_PATH, 0);
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 0, payloadClass);
 
       // 2nd batch: normal updates
       String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
-      List<HoodieRecord> updatesAtEpoch5 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch5, insertsAtEpoch0, 
5), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch5);
-      client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5, 
DEFAULT_FIRST_PARTITION_PATH, 5);
+      JavaRDD<WriteStatus> writeStatusesAtEpoch5 = 
client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
+      assertNoWriteErrors(writeStatusesAtEpoch5.collect());
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 5, payloadClass);
 
-      // 3rd batch: delete a record
+      // 3rd batch: delete the record with id 3 (the last one)
       String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
       client.startCommitWithTime(commitTimeAtEpoch6);
-      List<HoodieRecord> deletesAtEpoch6 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch6, 
Collections.singletonList(insertsAtEpoch0.get(0)), 6), 
DefaultHoodieRecordPayload.class, true);
-      client.upsert(jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6);
-      validate(metaClient, writeConfig, totalRecords - 1, commitTimeAtEpoch5, 
DEFAULT_FIRST_PARTITION_PATH, 5);
+      List<HoodieRecord> deletesAtEpoch6 = 
getDeletes(updatesAtEpoch5.subList(totalRecords - 1, totalRecords), 6, 
payloadClass);
+      JavaRDD<WriteStatus> writeStatusesAtEpoch6 = 
client.upsert(jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6);
+      assertNoWriteErrors(writeStatusesAtEpoch6.collect());
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords - 1, 
partition, 5, payloadClass);
 
       // 4th batch: normal updates
       String commitTimeAtEpoch9 = getCommitTimeAtUTC(9);
-      List<HoodieRecord> updatesAtEpoch9 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch9, insertsAtEpoch0, 
9), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch9 = getUpdates(updatesAtEpoch5, 9, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch9);
       client.upsert(jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch9, 
DEFAULT_FIRST_PARTITION_PATH, 9);
-
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 9, payloadClass);
     }
   }
 
+  private static Stream<Arguments> avroPayloadClassesThatHonorOrdering() {
+    return Stream.of(
+        Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+        Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
+    );
+  }
+
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testReadLatestRecordsWithLateArrivedRecords(HoodieTableType 
tableType) throws IOException {
-    HoodieWriteConfig writeConfig = getWriteConfig();
+  @MethodSource("avroPayloadClassesThatHonorOrdering")
+  public void testReadLatestRecordsWithLateArrivedRecords(HoodieTableType 
tableType, Class<?> payloadClass) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig(payloadClass);
     HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, 
writeConfig.getProps());
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
       final int totalRecords = 4;
+      final String partition = "foo";
+
       // 1st batch: inserts
       String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
-      List<HoodieRecord> insertsAtEpoch0 = convertPayload(dataGen
-          .generateInsertsForPartition(commitTimeAtEpoch0, totalRecords, 
DEFAULT_FIRST_PARTITION_PATH), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, partition, 
0, payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch0);
       client.upsert(jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch0, 
DEFAULT_FIRST_PARTITION_PATH, 0);
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 0, payloadClass);
 
       // 2nd batch: normal updates
       String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
-      List<HoodieRecord> updatesAtEpoch5 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch5, insertsAtEpoch0, 
5), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch5);
       client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5, 
DEFAULT_FIRST_PARTITION_PATH, 5);
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 5, payloadClass);
 
       // 3rd batch: updates with old timestamp will be discarded
       String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
-      List<HoodieRecord> updatesAtEpoch1 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch6, insertsAtEpoch0, 
1), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch1 = getUpdates(insertsAtEpoch0, 1, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch6);
       client.upsert(jsc().parallelize(updatesAtEpoch1, 1), commitTimeAtEpoch6);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5, 
DEFAULT_FIRST_PARTITION_PATH, 5);
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 5, payloadClass);
 
       // 4th batch: normal updates
       String commitTimeAtEpoch9 = getCommitTimeAtUTC(9);
-      List<HoodieRecord> updatesAtEpoch9 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch9, insertsAtEpoch0, 
9), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch9 = getUpdates(updatesAtEpoch5, 9, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch9);
       client.upsert(jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch9, 
DEFAULT_FIRST_PARTITION_PATH, 9);
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 9, payloadClass);
     }
   }
 
-  private void validate(HoodieTableMetaClient metaClient, HoodieWriteConfig 
writeConfig,
-      int totalRecords, String commitTime, String partition, long timestamp) 
throws IOException {
+  private void doMergedReadAndValidate(HoodieTableMetaClient metaClient, 
HoodieWriteConfig writeConfig,
+      int totalRecords, String partition, long timestamp, Class<?> 
payloadClass) throws IOException {
+    String orderingField = new 
HoodieAdaptablePayloadDataGenerator.RecordGen(payloadClass).getOrderingField();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieSparkTable.create(writeConfig, context(), 
metaClient);
     List<Pair<String, String>> partitionPathAndFileIDPairs = 
table.getHoodieView()
@@ -156,58 +185,38 @@ public class TestHoodieMergedReadHandle extends 
SparkClientFunctionalTestHarness
     HoodieMergedReadHandle mergedReadHandle = new 
HoodieMergedReadHandle<>(writeConfig, Option.of(latestCommitTime), table, 
partitionPathAndFileIDPairs.get(0));
     List<HoodieRecord> mergedRecords = mergedReadHandle.getMergedRecords();
     assertEquals(totalRecords, mergedRecords.size());
-    for (HoodieRecord rec : mergedRecords) {
-      HoodieRecord r = rec.wrapIntoHoodieRecordPayloadWithParams(
-          addMetadataFields(AVRO_SCHEMA, 
writeConfig.allowOperationMetadataField()),
-          writeConfig.getProps(), Option.empty(), 
writeConfig.allowOperationMetadataField(), Option.empty(), false, 
Option.of(AVRO_SCHEMA));
+    for (int i = 0; i < mergedRecords.size(); i++) {
+      HoodieRecord r = mergedRecords.get(i);
+      mergedRecords.set(i, r.wrapIntoHoodieRecordPayloadWithParams(
+          SCHEMA_WITH_METAFIELDS, writeConfig.getProps(), Option.empty(), 
writeConfig.allowOperationMetadataField(),
+          Option.empty(), false, Option.of(SCHEMA)));
+    }
+    List<HoodieRecord> sortedMergedRecords = mergedRecords.stream()
+        
.sorted(Comparator.comparing(HoodieRecord::getRecordKey)).collect(Collectors.toList());
+    for (int i = 0; i < sortedMergedRecords.size(); i++) {
+      HoodieRecord r = sortedMergedRecords.get(i);
+      assertEquals(i, Integer.parseInt(r.getRecordKey()));
       assertEquals(partition, r.getPartitionPath());
-      assertEquals(DefaultHoodieRecordPayload.class, r.getData().getClass());
-      DefaultHoodieRecordPayload data = (DefaultHoodieRecordPayload) 
r.getData();
-      assertEquals(timestamp, data.getOrderingVal());
-      Option<IndexedRecord> valueOpt = data.getInsertValue(AVRO_SCHEMA);
+      assertEquals(payloadClass.getName(), r.getData().getClass().getName());
+      Option<IndexedRecord> valueOpt = ((HoodieRecordPayload) 
r.getData()).getInsertValue(SCHEMA);
       assertTrue(valueOpt.isPresent());
       GenericRecord avroValue = (GenericRecord) valueOpt.get();
-      assertEquals("rider-" + commitTime, avroValue.get("rider").toString());
-      assertEquals("driver-" + commitTime, avroValue.get("driver").toString());
-      assertEquals(timestamp, 
Long.parseLong(avroValue.get("timestamp").toString()));
-    }
-  }
-
-  private List<HoodieRecord> convertPayload(List<HoodieRecord> records, 
Class<?> payloadClazz) throws IOException {
-    return convertPayload(records, payloadClazz, false);
-  }
-
-  private List<HoodieRecord> convertPayload(List<HoodieRecord> records, 
Class<?> payloadClazz, boolean isDeleted) throws IOException {
-    List<HoodieRecord> convertedRecords = new ArrayList<>();
-    for (HoodieRecord r : records) {
-      GenericRecord avroData = (GenericRecord) ((RawTripTestPayload) 
r.getData()).getRecordToInsert(AVRO_SCHEMA);
-      avroData.put("_hoodie_is_deleted", isDeleted);
-      convertedRecords.add(
-          createHoodieRecordFromAvro(avroData, payloadClazz.getName(),
-              "timestamp", Option.of(Pair.of("_row_key", "partition_path")),
-              false, Option.empty(), false, Option.of(AVRO_SCHEMA)));
+      assertEquals(i, Integer.parseInt(avroValue.get("id").toString()));
+      assertEquals(partition, avroValue.get("pt").toString());
+      assertEquals(timestamp, 
Long.parseLong(avroValue.get(orderingField).toString()));
     }
-    return convertedRecords;
   }
 
-  private HoodieWriteConfig getWriteConfig() {
-    Properties properties = new Properties();
-    properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
-    properties.put("hoodie.datasource.write.partitionpath.field", 
"partition_path");
-    properties.put("hoodie.datasource.write.precombine.field", "timestamp");
-    properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
-    properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
-    properties.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), "timestamp");
+  private HoodieWriteConfig getWriteConfig(Class<?> payloadClass) {
     return getConfigBuilder(true)
-        .withProperties(properties)
+        .withProperties(getKeyGenProps(payloadClass))
         .withParallelism(2, 2)
         .withBulkInsertParallelism(2)
         .withDeleteParallelism(1)
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
-        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withSchema(SCHEMA_STR)
         .withPayloadConfig(HoodiePayloadConfig.newBuilder()
-            .withPayloadClass(DefaultHoodieRecordPayload.class.getName())
-            .withPayloadOrderingField("timestamp").build())
+            .fromProperties(getPayloadProps(payloadClass)).build())
         .build();
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieAdaptablePayloadDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieAdaptablePayloadDataGenerator.java
new file mode 100644
index 00000000000..e403a9a3681
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieAdaptablePayloadDataGenerator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.DebeziumConstants;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+public class HoodieAdaptablePayloadDataGenerator {
+
+  public static final Schema SCHEMA = 
SchemaTestUtil.getSchemaFromResource(HoodieAdaptablePayloadDataGenerator.class, 
"/adaptable-payload.avsc");
+  public static final Schema SCHEMA_WITH_METAFIELDS = 
HoodieAvroUtils.addMetadataFields(SCHEMA, false);
+  public static final String SCHEMA_STR = SCHEMA.toString();
+
+  public static Properties getKeyGenProps(Class<?> payloadClass) {
+    String orderingField = new RecordGen(payloadClass).getOrderingField();
+    Properties props = new Properties();
+    props.put("hoodie.datasource.write.recordkey.field", "id");
+    props.put("hoodie.datasource.write.partitionpath.field", "pt");
+    props.put("hoodie.datasource.write.precombine.field", orderingField);
+    props.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "id");
+    props.put(HoodieTableConfig.PARTITION_FIELDS.key(), "pt");
+    props.put(HoodieTableConfig.PRECOMBINE_FIELD.key(), orderingField);
+    return props;
+  }
+
+  public static Properties getPayloadProps(Class<?> payloadClass) {
+    String orderingField = new RecordGen(payloadClass).getOrderingField();
+    Properties props = new Properties();
+    props.put("hoodie.compaction.payload.class", payloadClass.getName());
+    props.put("hoodie.payload.event.time.field", orderingField);
+    props.put("hoodie.payload.ordering.field", orderingField);
+    return props;
+  }
+
+  public static List<HoodieRecord> getInserts(int n, String partition, long 
ts, Class<?> payloadClass) throws IOException {
+    return getInserts(n, new String[] {partition}, ts, payloadClass);
+  }
+
+  public static List<HoodieRecord> getInserts(int n, String[] partitions, long 
ts, Class<?> payloadClass) throws IOException {
+    List<HoodieRecord> inserts = new ArrayList<>();
+    RecordGen recordGen = new RecordGen(payloadClass);
+    for (GenericRecord r : getInserts(n, partitions, ts, recordGen)) {
+      inserts.add(getHoodieRecord(r, recordGen.getPayloadClass()));
+    }
+    return inserts;
+  }
+
+  private static List<GenericRecord> getInserts(int n, String[] partitions, 
long ts, RecordGen recordGen) {
+    return IntStream.range(0, n).mapToObj(id -> {
+      String pt = partitions.length == 0 ? "" : partitions[id % 
partitions.length];
+      return getInsert(id, pt, ts, recordGen);
+    }).collect(Collectors.toList());
+  }
+
+  private static GenericRecord getInsert(int id, String pt, long ts, RecordGen 
recordGen) {
+    GenericRecord r = new GenericData.Record(SCHEMA);
+    r.put("id", id);
+    r.put("pt", pt);
+    return recordGen.populateForInsert(r, ts);
+  }
+
+  public static List<HoodieRecord> getUpdates(List<HoodieRecord> baseRecords, 
long ts, Class<?> payloadClass) throws IOException {
+    RecordGen recordGen = new RecordGen(payloadClass);
+    List<HoodieRecord> updates = new ArrayList<>();
+    Properties props = new Properties();
+    for (HoodieRecord r : baseRecords) {
+      GenericRecord gr = (GenericRecord) r.toIndexedRecord(SCHEMA, 
props).get().getData();
+      GenericRecord updated = 
getUpdate(Integer.parseInt(gr.get("id").toString()), gr.get("pt").toString(), 
ts, recordGen);
+      updates.add(getHoodieRecord(updated, recordGen.getPayloadClass()));
+    }
+    return updates;
+  }
+
+  private static GenericRecord getUpdate(int id, String pt, long ts, RecordGen 
recordGen) {
+    GenericRecord r = new GenericData.Record(SCHEMA);
+    r.put("id", id);
+    r.put("pt", pt);
+    return recordGen.populateForUpdate(r, ts);
+  }
+
+  public static List<HoodieRecord> getDeletes(List<HoodieRecord> baseRecords, 
long ts, Class<?> payloadClass) throws IOException {
+    RecordGen recordGen = new RecordGen(payloadClass);
+    List<HoodieRecord> deletes = new ArrayList<>();
+    Properties props = new Properties();
+    for (HoodieRecord r : baseRecords) {
+      GenericRecord gr = (GenericRecord) r.toIndexedRecord(SCHEMA, 
props).get().getData();
+      GenericRecord deleted = 
getDelete(Integer.parseInt(gr.get("id").toString()), gr.get("pt").toString(), 
ts, recordGen);
+      deletes.add(getHoodieRecord(deleted, recordGen.getPayloadClass()));
+    }
+    return deletes;
+  }
+
+  private static GenericRecord getDelete(int id, String pt, long ts, RecordGen 
recordGen) {
+    GenericRecord r = new GenericData.Record(SCHEMA);
+    r.put("id", id);
+    r.put("pt", pt);
+    return recordGen.populateForDelete(r, ts);
+  }
+
+  private static HoodieRecord getHoodieRecord(GenericRecord r, Class<?> 
payloadClass) throws IOException {
+    return new HoodieAvroIndexedRecord(r)
+        .prependMetaFields(
+            SCHEMA,
+            SCHEMA_WITH_METAFIELDS,
+            new 
MetadataValues().setRecordKey(r.get("id").toString()).setPartitionPath(r.get("pt").toString()),
+            new Properties())
+        .wrapIntoHoodieRecordPayloadWithParams(
+            SCHEMA_WITH_METAFIELDS,
+            getPayloadProps(payloadClass),
+            Option.empty(),
+            false,
+            Option.empty(),
+            false,
+            Option.of(SCHEMA));
+  }
+
+  public static class RecordGen {
+
+    public static final Set<Class<?>> SUPPORTED_PAYLOAD_CLASSES = new 
HashSet<>(Arrays.asList(
+        OverwriteWithLatestAvroPayload.class,
+        OverwriteNonDefaultsWithLatestAvroPayload.class,
+        PartialUpdateAvroPayload.class,
+        DefaultHoodieRecordPayload.class,
+        AWSDmsAvroPayload.class,
+        MySqlDebeziumAvroPayload.class,
+        PostgresDebeziumAvroPayload.class
+    ));
+
+    private final Class<?> payloadClass;
+    private final String orderingField;
+
+    public RecordGen(Class<?> payloadClass) {
+      checkArgument(SUPPORTED_PAYLOAD_CLASSES.contains(payloadClass));
+      this.payloadClass = payloadClass;
+      if (payloadClass == MySqlDebeziumAvroPayload.class) {
+        orderingField = "_event_seq";
+      } else if (payloadClass == PostgresDebeziumAvroPayload.class) {
+        orderingField = "_event_lsn";
+      } else {
+        orderingField = "ts";
+      }
+    }
+
+    public Class<?> getPayloadClass() {
+      return payloadClass;
+    }
+
+    public String getOrderingField() {
+      return orderingField;
+    }
+
+    GenericRecord populateForInsert(GenericRecord r, long ts) {
+      r.put(orderingField, ts);
+      if (payloadClass == AWSDmsAvroPayload.class) {
+        r.put(AWSDmsAvroPayload.OP_FIELD, "I");
+      }
+      return r;
+    }
+
+    GenericRecord populateForUpdate(GenericRecord r, long ts) {
+      r.put(orderingField, ts);
+      if (payloadClass == AWSDmsAvroPayload.class) {
+        r.put(AWSDmsAvroPayload.OP_FIELD, "U");
+      }
+      return r;
+    }
+
+    GenericRecord populateForDelete(GenericRecord r, long ts) {
+      r.put(orderingField, ts);
+      if (payloadClass == MySqlDebeziumAvroPayload.class) {
+        r.put(DebeziumConstants.FLATTENED_OP_COL_NAME, 
DebeziumConstants.DELETE_OP);
+      } else if (payloadClass == PostgresDebeziumAvroPayload.class) {
+        r.put(DebeziumConstants.FLATTENED_OP_COL_NAME, 
DebeziumConstants.DELETE_OP);
+      } else if (payloadClass == AWSDmsAvroPayload.class) {
+        r.put(AWSDmsAvroPayload.OP_FIELD, "D");
+      } else {
+        r.put(HOODIE_IS_DELETED_FIELD, true);
+      }
+      return r;
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index 0553f31f616..f9a67a13710 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -20,12 +20,14 @@
 package org.apache.hudi.common.testutils;
 
 import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
@@ -39,6 +41,7 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +50,9 @@ import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+
 /**
  * Example row change event based on some example data used by testcases. The 
data avro schema is
  * src/test/resources/schema1.
@@ -158,6 +164,23 @@ public class RawTripTestPayload implements 
HoodieRecordPayload<RawTripTestPayloa
         .collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> asDefaultPayloadRecords(List<HoodieRecord> 
records) throws IOException {
+    return asDefaultPayloadRecords(records, false);
+  }
+
+  public static List<HoodieRecord> asDefaultPayloadRecords(List<HoodieRecord> 
records, boolean isDeleted) throws IOException {
+    List<HoodieRecord> convertedRecords = new ArrayList<>();
+    for (HoodieRecord r : records) {
+      GenericRecord avroData = (GenericRecord) ((RawTripTestPayload) 
r.getData()).getRecordToInsert(AVRO_SCHEMA);
+      avroData.put("_hoodie_is_deleted", isDeleted);
+      convertedRecords.add(
+          createHoodieRecordFromAvro(avroData, 
DefaultHoodieRecordPayload.class.getName(),
+              "timestamp", Option.of(Pair.of("_row_key", "partition_path")),
+              false, Option.empty(), false, Option.of(AVRO_SCHEMA)));
+    }
+    return convertedRecords;
+  }
+
   public String getPartitionPath() {
     return partitionPath;
   }
diff --git a/hudi-common/src/test/resources/adaptable-payload.avsc 
b/hudi-common/src/test/resources/adaptable-payload.avsc
new file mode 100644
index 00000000000..68e48e11afa
--- /dev/null
+++ b/hudi-common/src/test/resources/adaptable-payload.avsc
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+  "type": "record",
+  "name": "PayloadAdaptableRecord",
+  "namespace": "hoodie",
+  "fields": [
+    {
+      "name": "id",
+      "type": "int"
+    },
+    {
+      "name": "pt",
+      "type": "string"
+    },
+    {
+      "name": "ts",
+      "type": [
+        "null",
+        "long"
+      ],
+      "default": null
+    },
+    {
+      "name": "Op",
+      "type": [
+        "null",
+        "string"
+      ],
+      "default": null
+    },
+    {
+      "name": "_change_operation_type",
+      "type": [
+        "null",
+        "string"
+      ],
+      "default": null
+    },
+    {
+      "name": "_hoodie_is_deleted",
+      "type": [
+        "null",
+        "boolean"
+      ],
+      "default": null
+    },
+    {
+      "name": "_event_seq",
+      "type": [
+        "null",
+        "long"
+      ],
+      "default": null
+    },
+    {
+      "name": "_event_lsn",
+      "type": [
+        "null",
+        "long"
+      ],
+      "default": null
+    }
+  ]
+}

Reply via email to