[ 
https://issues.apache.org/jira/browse/HUDI-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938403#comment-17938403
 ] 

sivabalan narayanan commented on HUDI-8915:
-------------------------------------------

Nope, we still have the issue. 

here is the root cause. 

 

with HoodieMergeHandle, we are using HoodieAvroRecordMerger as the merger 
class. 

and the payload is ExpressionPayload. 

here is how the call stack looks like 
{code:java}
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, 
oldSchema, newRecord, newSchema, props);{code}
{code:java}
// where oldRecord is valid prev record w/ ts 100. and newRecord is to be 
deleted record but w/ lower ts of 99. {code}
{code:java}
->  @Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
  return combineAndGetUpdateValue(older, newer, newSchema, props)
      .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
}{code}
 
{code:java}
-> private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
  Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, 
props).map(HoodieAvroIndexedRecord::getData);
  if (!previousAvroData.isPresent()) {
    return Option.empty();
  }

  return ((HoodieAvroRecord) 
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, 
props);
}{code}
 
{code:java}
-> the last line from above is invoked -> ExpressionPayload {code}
 
{code:java}
override def combineAndGetUpdateValue(targetRecord: IndexedRecord,
                                      schema: Schema,
                                      properties: Properties): 
HOption[IndexedRecord] = {
  val recordSchema = getRecordSchema(properties)

  val sourceRecord = bytesToAvro(recordBytes, recordSchema)
  val joinedRecord = joinRecord(sourceRecord, targetRecord, properties)

  processMatchedRecord(ConvertibleRecord(joinedRecord), Some(targetRecord), 
properties)
} {code}

So, in expression payload, we never account for ordering value. So, at the end 
of this combineAndGetUpdateValue, we get a merged record. 


{code:java}
after merging, we call processMatchedRecord{code}
{code:java}
if (resultRecordOpt == null) {
  // Process delete
  val deleteConditionText = 
properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
  if (deleteConditionText != null) {
    val (deleteConditionEvaluator, _) = 
getEvaluator(deleteConditionText.toString, inputRecord.asAvro.getSchema).head
    val deleteConditionEvalResult = 
deleteConditionEvaluator.apply(inputRecord.asRow)
      .get(0, BooleanType)
      .asInstanceOf[Boolean]
    if (deleteConditionEvalResult) {
      resultRecordOpt = HOption.empty()
    }
  }
}{code}
{code:java}
 in above code snippet, deleteConditionEvalResult ends up as true. {code}
 

> COW MIT delete operations does not honer event time based ordering
> ------------------------------------------------------------------
>
>                 Key: HUDI-8915
>                 URL: https://issues.apache.org/jira/browse/HUDI-8915
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: Davis Zhang
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 1.0.2
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> if COW table has record
> (id 1, ts 100)
> do MIT delete with given record (id 1, ts 99). This should not be deleted.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to