the-other-tim-brown commented on code in PR #13549:
URL: https://github.com/apache/hudi/pull/13549#discussion_r2205622466


##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,217 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, INSERT.value(), writeConfigs);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
 
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, INSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
     }
   }
 
+  private static List<Pair<String, IndexedRecord>> 
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+    return hoodieRecords.stream().map(r -> {
+      try {
+        return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).filter(Option::isPresent).map(Option::get).map(r -> 
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+  }
+
+  /**
+   * Write a base file with schema A, then write another base file with schema 
B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords, firstIndexedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> secondRecords = 
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, mergedRecords);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to insert data ", e);

Review Comment:
   You can just let this throw, it will fail the test



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,217 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, INSERT.value(), writeConfigs);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
 
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, INSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
     }
   }
 
+  private static List<Pair<String, IndexedRecord>> 
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+    return hoodieRecords.stream().map(r -> {
+      try {
+        return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).filter(Option::isPresent).map(Option::get).map(r -> 
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+  }
+
+  /**
+   * Write a base file with schema A, then write another base file with schema 
B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords, firstIndexedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> secondRecords = 
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, mergedRecords);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to insert data ", e);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema A, 
then write another base file with schema B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFileAndLogFilesWithDifferentSchema() 
throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords, firstIndexedRecords);
+
+      // Write a log file with schema A
+      List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002", 
5);
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, UPSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+      List<Pair<String, IndexedRecord>> unmergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, unmergedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> thirdRecords = 
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> thirdIndexedRecords = 
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+      commitToTable(thirdRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      mergedRecords = CollectionUtils.combine(mergedRecords, 
thirdIndexedRecords);
+      unmergedRecords = CollectionUtils.combine(unmergedRecords, 
thirdIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, unmergedRecords);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to insert data ", e);

Review Comment:
   Similarly here and in the other test cases



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,217 @@ public void 
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a log file only
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
-      commitToTable(initialRecords, INSERT.value(), writeConfigs);
+      commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
 
       // Two commits; reading one file group containing two log files
       List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
-      commitToTable(updates, UPSERT.value(), writeConfigs);
+      commitToTable(updates, INSERT.value(), false, writeConfigs);
       validateOutputFromFileGroupReader(
           getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
     }
   }
 
+  private static List<Pair<String, IndexedRecord>> 
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+    return hoodieRecords.stream().map(r -> {
+      try {
+        return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).filter(Option::isPresent).map(Option::get).map(r -> 
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+  }
+
+  /**
+   * Write a base file with schema A, then write another base file with schema 
B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws 
Exception {
+    Map<String, String> writeConfigs = new HashMap<>(
+        getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+      dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+      // Write a base file with schema A
+      List<HoodieRecord> firstRecords = 
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+      List<Pair<String, IndexedRecord>> firstIndexedRecords = 
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+      commitToTable(firstRecords, INSERT.value(), true, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          firstIndexedRecords, firstIndexedRecords);
+
+      // Evolve schema
+      dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+      // Write another base file with schema B
+      List<HoodieRecord> secondRecords = 
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+      List<Pair<String, IndexedRecord>> secondIndexedRecords = 
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+      commitToTable(secondRecords, INSERT.value(), false, writeConfigs, 
dataGen.getExtendedSchema().toString());
+      List<Pair<String, IndexedRecord>> mergedRecords = 
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+      validateOutputFromFileGroupReaderWithNativeRecords(
+          getStorageConf(), getBasePath(),
+          true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+          mergedRecords, mergedRecords);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to insert data ", e);
+    }
+  }
+
+  /**
+   * Write a base file with schema A, then write a log file with schema A, 
then write another base file with schema B.
+   */
+  @Test
+  public void testSchemaEvolutionWhenBaseFileAndLogFilesWithDifferentSchema() 
throws Exception {

Review Comment:
   The test name is a bit misleading, it is the base files that differ



##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java:
##########
@@ -49,4 +50,15 @@ public HoodieReaderContext<IndexedRecord> 
getHoodieReaderContext(
   public void assertRecordsEqual(Schema schema, IndexedRecord expected, 
IndexedRecord actual) {
     assertEquals(expected, actual);
   }
+
+  @Override
+  public void assertRecordMatchesSchema(Schema schema, IndexedRecord record) {
+    // TODO: maybe need to validate the record fields
+    assertEquals(schema, record.getSchema());

Review Comment:
   There is a method in `GenericData` that can do this named `validate`



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -285,6 +499,52 @@ protected Map<String, String> 
getCommonConfigs(RecordMergeMode recordMergeMode,
     return configMapping;
   }
 
+  private void 
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?> 
storageConf,
+                                                                    String 
tablePath,
+                                                                    boolean 
containsBaseFile,
+                                                                    int 
expectedLogFileNum,
+                                                                    
RecordMergeMode recordMergeMode,
+                                                                    
List<Pair<String, IndexedRecord>> expectedRecords,
+                                                                    
List<Pair<String, IndexedRecord>> expectedUnmergedRecords) throws Exception {
+    Set<String> metaCols = new HashSet<>(HoodieRecord.HOODIE_META_COLUMNS);
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+    Schema avroSchema = resolver.getTableAvroSchema();
+    Schema avroSchemaWithoutMeta = resolver.getTableAvroSchema(false);
+    // use reader context for conversion to engine specific objects
+    HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, 
avroSchema, getStorageConf(), metaClient);
+    List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath, 
metaClient, containsBaseFile, expectedLogFileNum);
+    boolean sortOutput = !containsBaseFile;
+    List<T> actualRecordList =
+        readRecordsFromFileGroup(storageConf, tablePath, metaClient, 
fileSlices, avroSchema, recordMergeMode, false, sortOutput);
+    assertEquals(expectedRecords.size(), actualRecordList.size());
+    actualRecordList.forEach(r -> assertRecordMatchesSchema(avroSchema, r));
+    Set<GenericRecord> actualRecordSet = actualRecordList.stream().map(r ->  
readerContext.convertToAvroRecord(r, avroSchema))
+        .map(r -> HoodieAvroUtils.removeFields(r, metaCols))
+        .collect(Collectors.toSet());
+    Set<GenericRecord> expectedRecordSet = expectedRecords.stream()
+        .map(r -> (GenericRecord) r.getRight())
+        .map(r -> HoodieAvroUtils.rewriteRecordWithNewSchema(r, 
avroSchemaWithoutMeta))
+        .collect(Collectors.toSet());
+    compareRecordSets(expectedRecordSet, actualRecordSet);
+  }
+
+  private void compareRecordSets(Set<GenericRecord> expectedRecordSet, 
Set<GenericRecord> actualRecordSet) {
+    Map<String, GenericRecord> expectedMap = new 
HashMap<>(expectedRecordSet.size());
+    for (GenericRecord expectedRecord : expectedRecordSet) {
+      expectedMap.put(expectedRecord.get("_row_key").toString(), 
expectedRecord);
+    }
+    Map<String, GenericRecord> actualMap = new 
HashMap<>(actualRecordSet.size());
+    for (GenericRecord actualRecord : actualRecordSet) {
+      actualMap.put(actualRecord.get("_row_key").toString(), actualRecord);
+    }
+    for (String key : actualMap.keySet()) {

Review Comment:
   Can you add a sanity check that the maps' keysets have the same size? There 
is an assertion on the lists being the same size before but there is a small 
chance of duplicates hiding missing records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to