xushiyan commented on code in PR #8675:
URL: https://github.com/apache/hudi/pull/8675#discussion_r1190611852


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java:
##########
@@ -38,113 +41,139 @@
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Properties;
 import java.util.stream.Collectors;
-
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_STR;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_WITH_METAFIELDS;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getDeletes;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getInserts;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getKeyGenProps;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getPayloadProps;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getUpdates;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieMergedReadHandle extends 
SparkClientFunctionalTestHarness {
 
-  private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+  private static Stream<Arguments> avroPayloadClasses() {
+    return Stream.of(
+        Arguments.of(COPY_ON_WRITE, OverwriteWithLatestAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, 
OverwriteNonDefaultsWithLatestAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+        Arguments.of(MERGE_ON_READ, OverwriteWithLatestAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, 
OverwriteNonDefaultsWithLatestAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)

Review Comment:
   purposely left out AWSDmsPayload and debezium payload cases 
https://github.com/apache/hudi/pull/8675/commits/875f77fd40a5b93dd7fcfa628e981f575372d6e7
   to land this patch. When doing proper fix for payload, those classes can be 
added here to cover custom delete marker scenario.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergedReadHandle.java:
##########
@@ -38,113 +41,139 @@
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
-import java.util.Properties;
 import java.util.stream.Collectors;
-
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.createHoodieRecordFromAvro;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_STR;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.SCHEMA_WITH_METAFIELDS;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getDeletes;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getInserts;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getKeyGenProps;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getPayloadProps;
+import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator.getUpdates;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieMergedReadHandle extends 
SparkClientFunctionalTestHarness {
 
-  private HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+  private static Stream<Arguments> avroPayloadClasses() {
+    return Stream.of(
+        Arguments.of(COPY_ON_WRITE, OverwriteWithLatestAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, 
OverwriteNonDefaultsWithLatestAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+        Arguments.of(MERGE_ON_READ, OverwriteWithLatestAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, 
OverwriteNonDefaultsWithLatestAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)
+    );
+  }
 
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testReadLatestRecordsWithDeletes(HoodieTableType tableType) 
throws IOException {
-    HoodieWriteConfig writeConfig = getWriteConfig();
+  @MethodSource("avroPayloadClasses")
+  public void testReadLatestRecordsWithDeletes(HoodieTableType tableType, 
Class<?> payloadClass) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig(payloadClass);
     HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, 
writeConfig.getProps());
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
       final int totalRecords = 4;
+      final String partition = "foo";
+
       // 1st batch: inserts
       String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
-      List<HoodieRecord> insertsAtEpoch0 = convertPayload(dataGen
-          .generateInsertsForPartition(commitTimeAtEpoch0, totalRecords, 
DEFAULT_FIRST_PARTITION_PATH), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, partition, 
0, payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch0);
       client.upsert(jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch0, 
DEFAULT_FIRST_PARTITION_PATH, 0);
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 0, payloadClass);
 
       // 2nd batch: normal updates
       String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
-      List<HoodieRecord> updatesAtEpoch5 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch5, insertsAtEpoch0, 
5), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0, 5, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch5);
-      client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch5, 
DEFAULT_FIRST_PARTITION_PATH, 5);
+      JavaRDD<WriteStatus> writeStatusesAtEpoch5 = 
client.upsert(jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5);
+      assertNoWriteErrors(writeStatusesAtEpoch5.collect());
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 5, payloadClass);
 
-      // 3rd batch: delete a record
+      // 3rd batch: delete the record with id 3 (the last one)
       String commitTimeAtEpoch6 = getCommitTimeAtUTC(6);
       client.startCommitWithTime(commitTimeAtEpoch6);
-      List<HoodieRecord> deletesAtEpoch6 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch6, 
Collections.singletonList(insertsAtEpoch0.get(0)), 6), 
DefaultHoodieRecordPayload.class, true);
-      client.upsert(jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6);
-      validate(metaClient, writeConfig, totalRecords - 1, commitTimeAtEpoch5, 
DEFAULT_FIRST_PARTITION_PATH, 5);
+      List<HoodieRecord> deletesAtEpoch6 = 
getDeletes(updatesAtEpoch5.subList(totalRecords - 1, totalRecords), 6, 
payloadClass);
+      JavaRDD<WriteStatus> writeStatusesAtEpoch6 = 
client.upsert(jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6);
+      assertNoWriteErrors(writeStatusesAtEpoch6.collect());
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords - 1, 
partition, 5, payloadClass);
 
       // 4th batch: normal updates
       String commitTimeAtEpoch9 = getCommitTimeAtUTC(9);
-      List<HoodieRecord> updatesAtEpoch9 = convertPayload(dataGen
-          .generateUpdatesWithTimestamp(commitTimeAtEpoch9, insertsAtEpoch0, 
9), DefaultHoodieRecordPayload.class);
+      List<HoodieRecord> updatesAtEpoch9 = getUpdates(updatesAtEpoch5, 9, 
payloadClass);
       client.startCommitWithTime(commitTimeAtEpoch9);
       client.upsert(jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9);
-      validate(metaClient, writeConfig, totalRecords, commitTimeAtEpoch9, 
DEFAULT_FIRST_PARTITION_PATH, 9);
-
+      doMergedReadAndValidate(metaClient, writeConfig, totalRecords, 
partition, 9, payloadClass);
     }
   }
 
+  private static Stream<Arguments> avroPayloadClassesThatHonorOrdering() {
+    return Stream.of(
+        Arguments.of(COPY_ON_WRITE, PartialUpdateAvroPayload.class),
+        Arguments.of(COPY_ON_WRITE, DefaultHoodieRecordPayload.class),
+        Arguments.of(MERGE_ON_READ, PartialUpdateAvroPayload.class),
+        Arguments.of(MERGE_ON_READ, DefaultHoodieRecordPayload.class)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to