yihua commented on code in PR #11881:
URL: https://github.com/apache/hudi/pull/11881#discussion_r1764066709
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java:
##########
@@ -214,41 +217,71 @@ public void commitToTable(List<HoodieRecord> recordList,
String operation, Map<S
}
@Override
- public void validateRecordsInFileGroup(String tablePath, List<ArrayWritable>
actualRecordList, Schema schema, String fileGroupId) {
+ public void validateRecordsInFileGroup(String tablePath, List<ArrayWritable>
actualRecordList, Schema schema, FileSlice fileSlice, boolean isSkipMerge) {
assertEquals(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA),
schema);
+ String fileGroupId = fileSlice.getFileId();
try {
//prepare fg reader records to be compared to the baseline reader
HoodieReaderContext<ArrayWritable> readerContext =
getHoodieReaderContext(tablePath, schema, storageConf);
Map<String, ArrayWritable> recordMap = new HashMap<>();
for (ArrayWritable record : actualRecordList) {
- recordMap.put(readerContext.getRecordKey(record, schema), record);
+ recordMap.put(createUniqueKey(readerContext, schema, record,
isSkipMerge), record);
}
- RecordReader<NullWritable, ArrayWritable> reader =
createRecordReader(tablePath);
+ RecordReader<NullWritable, ArrayWritable> reader =
createRecordReader(tablePath, isSkipMerge);
// use reader to read log file.
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
+ //TODO: [HUDI-8209] get rid of logFileCounts and don't guard
recordMap.remove(uniqueKey);
+ Map<String, Integer> logFileCounts = new HashMap<>();
while (reader.next(key, value)) {
if (readerContext.getValue(value, schema,
HoodieRecord.FILENAME_METADATA_FIELD).toString().contains(fileGroupId)) {
//only evaluate records from the specified filegroup. Maybe there is
a way to get
//hive to do this?
- ArrayWritable compVal =
recordMap.remove(readerContext.getRecordKey(value, schema));
- assertNotNull(compVal);
- ArrayWritableTestUtil.assertArrayWritableEqual(schema, value,
compVal, USE_FAKE_PARTITION);
+ String uniqueKey = createUniqueKey(readerContext, schema, value,
isSkipMerge);
+ Integer seenCount = logFileCounts.get(uniqueKey);
+ boolean isLogFile = isLogFileRec(readerContext, schema, value);
+ if (!isSkipMerge || seenCount == null) {
+ ArrayWritable compVal = recordMap.remove(uniqueKey);
+ assertNotNull(compVal);
+ ArrayWritableTestUtil.assertArrayWritableEqual(schema, value,
compVal, USE_FAKE_PARTITION);
+ if (isSkipMerge && isLogFile) {
+ logFileCounts.put(uniqueKey, 1);
+ }
+ } else {
+ assertTrue(isLogFile);
+ logFileCounts.put(uniqueKey, seenCount + 1);
+ }
}
key = reader.createKey();
value = reader.createValue();
}
reader.close();
assertEquals(0, recordMap.size());
+ for (Integer v : logFileCounts.values()) {
+ assertEquals(2, v);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- private RecordReader<NullWritable, ArrayWritable> createRecordReader(String
tablePath) throws IOException {
+ private static boolean isLogFileRec(HoodieReaderContext<ArrayWritable>
readerContext, Schema schema, ArrayWritable record) {
+ return !readerContext.getValue(record, schema,
HoodieRecord.FILENAME_METADATA_FIELD).toString().contains(".parquet");
+ }
+
+ private static String createUniqueKey(HoodieReaderContext<ArrayWritable>
readerContext, Schema schema, ArrayWritable record, boolean isSkipMerge) {
+ if (isSkipMerge) {
+ return readerContext.getRecordKey(record, schema) +
readerContext.getValue(record, schema,
HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
Review Comment:
nit: add `_` between the record key and the commit time for debuggability.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java:
##########
@@ -62,7 +62,7 @@ protected boolean doHasNext() throws IOException {
// Output from base file first.
if (baseFileIterator.hasNext()) {
- nextRecord = baseFileIterator.next();
+ nextRecord = readerContext.seal(baseFileIterator.next());
Review Comment:
Good catch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]