hudi-bot opened a new issue, #17372: URL: https://github.com/apache/hudi/issues/17372
if COW table has record (id 1, ts 100) do MIT delete with given record (id 1, ts 99). This should not be deleted. ## JIRA info - Link: https://issues.apache.org/jira/browse/HUDI-8915 - Type: Sub-task - Parent: https://issues.apache.org/jira/browse/HUDI-8722 - Fix version(s): - 1.1.0 --- ## Comments 06/Mar/25 21:59;yihua;This should already be fixed. I'll check the test coverage.;;; --- 26/Mar/25 02:25;shivnarayan;Nope, we still have the issue. here is the root cause. with HoodieMergeHandle, we are using HoodieAvroRecordMerger as the merger class. and the payload is ExpressionPayload. here is how the call stack looks like {code:java} Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);{code} {code:java} // where oldRecord is valid prev record w/ ts 100. and newRecord is to be deleted record but w/ lower ts of 99. {code} {code:java} -> @Override public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException { return combineAndGetUpdateValue(older, newer, newSchema, props) .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema())); }{code} {code:java} -> private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData); if (!previousAvroData.isPresent()) { return Option.empty(); } return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props); }{code} {code:java} -> the last line from above is invoked -> ExpressionPayload {code} {code:java} override def combineAndGetUpdateValue(targetRecord: IndexedRecord, schema: Schema, properties: Properties): HOption[IndexedRecord] = { val recordSchema = getRecordSchema(properties) val sourceRecord = bytesToAvro(recordBytes, recordSchema) val joinedRecord = joinRecord(sourceRecord, targetRecord, properties) processMatchedRecord(ConvertibleRecord(joinedRecord), Some(targetRecord), properties) } {code} So, in expression payload, we never account for ordering value. So, at the end of this combineAndGetUpdateValue, we get a merged record. {code:java} after merging, we call processMatchedRecord{code} {code:java} if (resultRecordOpt == null) { // Process delete val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) if (deleteConditionText != null) { val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, inputRecord.asAvro.getSchema).head val deleteConditionEvalResult = deleteConditionEvaluator.apply(inputRecord.asRow) .get(0, BooleanType) .asInstanceOf[Boolean] if (deleteConditionEvalResult) { resultRecordOpt = HOption.empty() } } }{code} {code:java} in above code snippet, deleteConditionEvalResult ends up as true. {code} ;;; --- 26/Mar/25 02:26;shivnarayan;Essentially, ExpressionPayload combineAndGetUpdate does not honor ordering value. guess its a known limitation. Just that our FileGroup reader for MOR table on the read code path takes care of accounting for ordering value as well when two versions of the record is merged. ;;; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
