jonvex commented on PR #9774:
URL: https://github.com/apache/hudi/pull/9774#issuecomment-1749673213

   > > if they have the same precombine, the base file records will be chosen 
over the log file records.
   > 
   > Can you show me where is this happening in current code. As fasr as I know 
that clustering will use the mergd log record reader,w hcih honors the payload.
   
   In HoodieFileSliceReader:
   ```
     public static HoodieFileSliceReader getFileSliceReader(
         Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option<Pair<String, String>> 
simpleKeyGenFieldsOpt) throws IOException {
       if (baseFileReader.isPresent()) {
         Iterator<HoodieRecord> baseIterator = 
baseFileReader.get().getRecordIterator(schema);
         while (baseIterator.hasNext()) {
           
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema,
 props,
               simpleKeyGenFieldsOpt, scanner.isWithOperationField(), 
scanner.getPartitionNameOverride(), false, Option.empty()));
         }
       }
       return new HoodieFileSliceReader(scanner.iterator());
     }
   ```
   in HoodieMergedLogRecordScanner:
   ```
     @Override
     public <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
       String key = newRecord.getRecordKey();
       HoodieRecord<T> prevRecord = records.get(key);
       if (prevRecord != null) {
         // Merge and store the combined record
         HoodieRecord<T> combinedRecord = (HoodieRecord<T>) 
recordMerger.merge(prevRecord, readerSchema,
             newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
         // If pre-combine returns existing record, no need to update it
         if (combinedRecord.getData() != prevRecord.getData()) {
           HoodieRecord latestHoodieRecord =
               combinedRecord.newInstance(new HoodieKey(key, 
newRecord.getPartitionPath()), newRecord.getOperation());
   
           latestHoodieRecord.unseal();
           
latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
           latestHoodieRecord.seal();
   
           // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
           //       payload pointing into a shared, mutable (underlying) buffer 
we get a clean copy of
           //       it since these records will be put into records(Map).
           records.put(key, latestHoodieRecord.copy());
         }
       } else {
         // Put the record as is
         // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
         //       payload pointing into a shared, mutable (underlying) buffer 
we get a clean copy of
         //       it since these records will be put into records(Map).
         records.put(key, newRecord.copy());
       }
     }
   ```
   This is fundamentally wrong because the base file records are added to the 
scanner as new records after all the log files have been scanned. They are then 
treated by the record merger as the newer record


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to