jonvex commented on code in PR #13549:
URL: https://github.com/apache/hudi/pull/13549#discussion_r2216267520
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,204 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, INSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ private static List<Pair<String, IndexedRecord>>
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+ return hoodieRecords.stream().map(r -> {
+ try {
+ return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Option::isPresent).map(Option::get).map(r ->
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+ }
+
+ /**
+ * Write a base file with schema A, then write another base file with schema
B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> secondRecords =
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another base file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFileHasDifferentSchemaThanLogFiles()
throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Write a log file with schema A
+ List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002",
5);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> thirdRecords =
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> thirdIndexedRecords =
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+ commitToTable(thirdRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ mergedRecords = CollectionUtils.combine(mergedRecords,
thirdIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ // use -1 to prevent validation of numlogfiles because one fg has a
log file but the other doesn't
+ true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another log file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenLogFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator baseFileDataGen =
+ new HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ baseFileDataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write base file with schema A
+ List<HoodieRecord> firstRecords = baseFileDataGen.generateInserts("001",
100);
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords,
baseFileDataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
baseFileDataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords);
+
+ // Write log file with schema A
+ List<HoodieRecord> secondRecords =
baseFileDataGen.generateUniqueUpdates("002", 50);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords,
baseFileDataGen.getExtendedSchema());
Review Comment:
```
private void
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?>
storageConf,
String
tablePath,
boolean
containsBaseFile,
int
expectedLogFileNum,
RecordMergeMode recordMergeMode,
List<Pair<String, IndexedRecord>> expectedRecords) throws Exception {
```
in the validation we assert the number of log files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]