nsivabalan commented on code in PR #13010:
URL: https://github.com/apache/hudi/pull/13010#discussion_r2013058804
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -288,13 +301,21 @@ private FileSlice
getFileSliceToRead(StorageConfiguration<?> storageConf,
return fileSlice;
}
- private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf,
- String tablePath,
- HoodieTableMetaClient metaClient,
- FileSlice fileSlice,
- Schema avroSchema,
- RecordMergeMode recordMergeMode,
- boolean isSkipMerge) throws
Exception {
+ private FileSlice
getFileSliceToReadIncludingInflight(StorageConfiguration<?> storageConf, String
tablePath,
+ HoodieTableMetaClient
metaClient, String[] partitionPaths,
+ boolean
containsBaseFile, int expectedLogFileNum) {
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(storageConf);
+ HoodieTableFileSystemView fsView =
HoodieTableFileSystemView.fileListingBasedFileSystemView(engineContext,
metaClient, metaClient.getActiveTimeline(), false);
Review Comment:
where are we closing the FSV ?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala:
##########
@@ -176,4 +184,25 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
assertEquals(expectedOrderingValue,
metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD))
}
+
+ @ParameterizedTest
+ @EnumSource(classOf[RecordMergeMode])
+ @throws[Exception]
+ def testReadFileGroupInflightData(recordMergeMode: RecordMergeMode): Unit = {
+ val writeConfigs = new util.HashMap[String,
String](getCommonConfigs(recordMergeMode))
+ writeConfigs.put(DataSourceWriteOptions.TABLE_TYPE.key(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ try {
+ val dataGen = new HoodieTestDataGenerator(0xDEEF)
+ try {
+ // One commit; reading one file group containing a base file only
+ commitToTable(dataGen.generateInserts("001", 100), INSERT.value,
writeConfigs)
+ validateOutputFromFileGroupReader(getStorageConf, getBasePath,
dataGen.getPartitionPaths, true, 0, recordMergeMode)
+
+ commitToTable(dataGen.generateUniqueUpdates("003", 100), UPSERT.value,
writeConfigs)
+ val metaClient = HoodieTestUtils.createMetaClient(getStorageConf,
getBasePath)
+ metaClient.getStorage.deleteFile(new
StoragePath(metaClient.getTimelinePath, new
DefaultInstantFileNameGenerator().getFileName(metaClient.getActiveTimeline.lastInstant().get())))
+ validateOutputFromFileGroupReaderIncludingInflight(getStorageConf,
getBasePath, dataGen.getPartitionPaths, true, 1, recordMergeMode, true)
Review Comment:
I would expect, we have a log file from a concurrent writer or something and
if we initialize the FG reader by setting "allowInflightInstants = true". FG
reader should return the records from inflight log file as well.
we don't need to add functional test for this.
just write a test directly against a FG reader (one file group essentially).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]