suryaprasanna commented on code in PR #7729:
URL: https://github.com/apache/hudi/pull/7729#discussion_r1226050070
##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -1391,14 +1404,128 @@ public void
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
throw new UncheckedIOException(io);
}
});
- assertEquals(100, readKeys.size(), "Stream collect should return 100
records, since 2nd block is rolled back");
- assertEquals(50, newEmptyPayloads.size(), "Stream collect should return
all 50 records with empty payloads");
- List<String> firstBlockRecords =
- copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
- .collect(Collectors.toList());
- Collections.sort(firstBlockRecords);
+ if (useScanv2) {
+ assertEquals(100, readKeys.size(), "Stream collect should return 100
records, since 2nd block is rolled back");
+ assertEquals(50, newEmptyPayloads.size(), "Stream collect should return
all 50 records with empty payloads");
+ List<String> firstBlockRecords =
+ copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+ .collect(Collectors.toList());
+ Collections.sort(firstBlockRecords);
+ Collections.sort(readKeys);
+ assertEquals(firstBlockRecords, readKeys, "CompositeAvroLogReader should
return 150 records from 2 versions");
+ } else {
+ assertEquals(200, readKeys.size(), "Stream collect should return all 200
records, since 2nd block that is being rolled back is not next to rollback
block.");
+ assertEquals(50, newEmptyPayloads.size(), "Stream collect should returns
empty records, since 2nd block that is being rolled back is not next to
rollback block.");
+ List<String> firstBlockRecords =
+ copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+ .collect(Collectors.toList());
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isCompressionEnabled,
+ boolean
readBlocksLazily,
+ boolean useScanv2)
+ throws IOException, URISyntaxException, InterruptedException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ // Set a small threshold so that every block is a new version
+ String fileId = "test-fileid111";
+ Writer writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withFileId(fileId).overBaseCommit("100").withFs(fs).build();
+
+ // Write 1 -> 100 records are written
+ SchemaTestUtil testUtil = new SchemaTestUtil();
Review Comment:
Created following ticket to address the refactoring.
https://issues.apache.org/jira/browse/HUDI-6357
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]