jonvex commented on PR #9774:
URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749673213
> > if they have the same precombine, the base file records will be chosen
over the log file records.
>
> Can you show me where is this happening in current code. As fasr as I know
that clustering will use the mergd log record reader,w hcih honors the payload.
In HoodieFileSliceReader:
```
public static HoodieFileSliceReader getFileSliceReader(
Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner
scanner, Schema schema, Properties props, Option<Pair<String, String>>
simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
Iterator<HoodieRecord> baseIterator =
baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema,
props,
simpleKeyGenFieldsOpt, scanner.isWithOperationField(),
scanner.getPartitionNameOverride(), false, Option.empty()));
}
}
return new HoodieFileSliceReader(scanner.iterator());
}
```
in HoodieMergedLogRecordScanner:
```
@Override
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
String key = newRecord.getRecordKey();
HoodieRecord<T> prevRecord = records.get(key);
if (prevRecord != null) {
// Merge and store the combined record
HoodieRecord<T> combinedRecord = (HoodieRecord<T>)
recordMerger.merge(prevRecord, readerSchema,
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != prevRecord.getData()) {
HoodieRecord latestHoodieRecord =
combinedRecord.newInstance(new HoodieKey(key,
newRecord.getPartitionPath()), newRecord.getOperation());
latestHoodieRecord.unseal();
latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
latestHoodieRecord.seal();
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, latestHoodieRecord.copy());
}
} else {
// Put the record as is
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, newRecord.copy());
}
}
```
This is fundamentally wrong because the base file records are added to the
scanner as new records after all the log files have been scanned. They are then
treated by the record merger as the newer record
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]