the-other-tim-brown commented on code in PR #13549:
URL: https://github.com/apache/hudi/pull/13549#discussion_r2205622466
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,217 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, INSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ private static List<Pair<String, IndexedRecord>>
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+ return hoodieRecords.stream().map(r -> {
+ try {
+ return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Option::isPresent).map(Option::get).map(r ->
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+ }
+
+ /**
+ * Write a base file with schema A, then write another base file with schema
B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords, firstIndexedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> secondRecords =
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, mergedRecords);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to insert data ", e);
Review Comment:
You can just let this throw, it will fail the test
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,217 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, INSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ private static List<Pair<String, IndexedRecord>>
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+ return hoodieRecords.stream().map(r -> {
+ try {
+ return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Option::isPresent).map(Option::get).map(r ->
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+ }
+
+ /**
+ * Write a base file with schema A, then write another base file with schema
B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords, firstIndexedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> secondRecords =
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, mergedRecords);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to insert data ", e);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another base file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFileAndLogFilesWithDifferentSchema()
throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 10, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords, firstIndexedRecords);
+
+ // Write a log file with schema A
+ List<HoodieRecord> secondRecords = dataGen.generateUniqueUpdates("002",
5);
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, UPSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
mergeIndexedRecordLists(secondIndexedRecords, firstIndexedRecords);
+ List<Pair<String, IndexedRecord>> unmergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, unmergedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> thirdRecords =
dataGen.generateInsertsForPartition("003", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> thirdIndexedRecords =
hoodieRecordsToIndexedRecords(thirdRecords, dataGen.getExtendedSchema());
+ commitToTable(thirdRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ mergedRecords = CollectionUtils.combine(mergedRecords,
thirdIndexedRecords);
+ unmergedRecords = CollectionUtils.combine(unmergedRecords,
thirdIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, -1, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, unmergedRecords);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to insert data ", e);
Review Comment:
Similarly here and in the other test cases
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -183,21 +201,217 @@ public void
testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a log file only
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
- commitToTable(initialRecords, INSERT.value(), writeConfigs);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing two log files
List<HoodieRecord> updates = dataGen.generateUniqueUpdates("002", 50);
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
- commitToTable(updates, UPSERT.value(), writeConfigs);
+ commitToTable(updates, INSERT.value(), false, writeConfigs);
validateOutputFromFileGroupReader(
getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ private static List<Pair<String, IndexedRecord>>
hoodieRecordsToIndexedRecords(List<HoodieRecord> hoodieRecords, Schema schema) {
+ return hoodieRecords.stream().map(r -> {
+ try {
+ return r.toIndexedRecord(schema, CollectionUtils.emptyProps());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Option::isPresent).map(Option::get).map(r ->
Pair.of(r.getRecordKey(), r.getData())).collect(Collectors.toList());
+ }
+
+ /**
+ * Write a base file with schema A, then write another base file with schema
B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFilesWithDifferentSchema() throws
Exception {
+ Map<String, String> writeConfigs = new HashMap<>(
+ getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING, true));
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(TRIP_EXAMPLE_SCHEMA, 0xDEEF)) {
+ dataGen.extendSchemaBeforeEvolution(getSchemaEvolutionConfigs());
+
+ // Write a base file with schema A
+ List<HoodieRecord> firstRecords =
dataGen.generateInsertsForPartition("001", 5, "any_partition");
+ List<Pair<String, IndexedRecord>> firstIndexedRecords =
hoodieRecordsToIndexedRecords(firstRecords, dataGen.getExtendedSchema());
+ commitToTable(firstRecords, INSERT.value(), true, writeConfigs,
dataGen.getExtendedSchema().toString());
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ firstIndexedRecords, firstIndexedRecords);
+
+ // Evolve schema
+ dataGen.extendSchemaAfterEvolution(getSchemaEvolutionConfigs());
+
+ // Write another base file with schema B
+ List<HoodieRecord> secondRecords =
dataGen.generateInsertsForPartition("002", 5, "new_partition");
+ List<Pair<String, IndexedRecord>> secondIndexedRecords =
hoodieRecordsToIndexedRecords(secondRecords, dataGen.getExtendedSchema());
+ commitToTable(secondRecords, INSERT.value(), false, writeConfigs,
dataGen.getExtendedSchema().toString());
+ List<Pair<String, IndexedRecord>> mergedRecords =
CollectionUtils.combine(firstIndexedRecords, secondIndexedRecords);
+ validateOutputFromFileGroupReaderWithNativeRecords(
+ getStorageConf(), getBasePath(),
+ true, 0, RecordMergeMode.EVENT_TIME_ORDERING,
+ mergedRecords, mergedRecords);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to insert data ", e);
+ }
+ }
+
+ /**
+ * Write a base file with schema A, then write a log file with schema A,
then write another base file with schema B.
+ */
+ @Test
+ public void testSchemaEvolutionWhenBaseFileAndLogFilesWithDifferentSchema()
throws Exception {
Review Comment:
The test name is a bit misleading, it is the base files that differ
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnJava.java:
##########
@@ -49,4 +50,15 @@ public HoodieReaderContext<IndexedRecord>
getHoodieReaderContext(
public void assertRecordsEqual(Schema schema, IndexedRecord expected,
IndexedRecord actual) {
assertEquals(expected, actual);
}
+
+ @Override
+ public void assertRecordMatchesSchema(Schema schema, IndexedRecord record) {
+ // TODO: maybe need to validate the record fields
+ assertEquals(schema, record.getSchema());
Review Comment:
There is a method in `GenericData` that can do this named `validate`
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -285,6 +499,52 @@ protected Map<String, String>
getCommonConfigs(RecordMergeMode recordMergeMode,
return configMapping;
}
+ private void
validateOutputFromFileGroupReaderWithNativeRecords(StorageConfiguration<?>
storageConf,
+ String
tablePath,
+ boolean
containsBaseFile,
+ int
expectedLogFileNum,
+
RecordMergeMode recordMergeMode,
+
List<Pair<String, IndexedRecord>> expectedRecords,
+
List<Pair<String, IndexedRecord>> expectedUnmergedRecords) throws Exception {
+ Set<String> metaCols = new HashSet<>(HoodieRecord.HOODIE_META_COLUMNS);
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+ TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+ Schema avroSchema = resolver.getTableAvroSchema();
+ Schema avroSchemaWithoutMeta = resolver.getTableAvroSchema(false);
+ // use reader context for conversion to engine specific objects
+ HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath,
avroSchema, getStorageConf(), metaClient);
+ List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath,
metaClient, containsBaseFile, expectedLogFileNum);
+ boolean sortOutput = !containsBaseFile;
+ List<T> actualRecordList =
+ readRecordsFromFileGroup(storageConf, tablePath, metaClient,
fileSlices, avroSchema, recordMergeMode, false, sortOutput);
+ assertEquals(expectedRecords.size(), actualRecordList.size());
+ actualRecordList.forEach(r -> assertRecordMatchesSchema(avroSchema, r));
+ Set<GenericRecord> actualRecordSet = actualRecordList.stream().map(r ->
readerContext.convertToAvroRecord(r, avroSchema))
+ .map(r -> HoodieAvroUtils.removeFields(r, metaCols))
+ .collect(Collectors.toSet());
+ Set<GenericRecord> expectedRecordSet = expectedRecords.stream()
+ .map(r -> (GenericRecord) r.getRight())
+ .map(r -> HoodieAvroUtils.rewriteRecordWithNewSchema(r,
avroSchemaWithoutMeta))
+ .collect(Collectors.toSet());
+ compareRecordSets(expectedRecordSet, actualRecordSet);
+ }
+
+ private void compareRecordSets(Set<GenericRecord> expectedRecordSet,
Set<GenericRecord> actualRecordSet) {
+ Map<String, GenericRecord> expectedMap = new
HashMap<>(expectedRecordSet.size());
+ for (GenericRecord expectedRecord : expectedRecordSet) {
+ expectedMap.put(expectedRecord.get("_row_key").toString(),
expectedRecord);
+ }
+ Map<String, GenericRecord> actualMap = new
HashMap<>(actualRecordSet.size());
+ for (GenericRecord actualRecord : actualRecordSet) {
+ actualMap.put(actualRecord.get("_row_key").toString(), actualRecord);
+ }
+ for (String key : actualMap.keySet()) {
Review Comment:
Can you add a sanity check that the maps' keysets have the same size? There
is an assertion on the lists being the same size before but there is a small
chance of duplicates hiding missing records.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]