1032851561 commented on issue #8087:
URL: https://github.com/apache/hudi/issues/8087#issuecomment-1477510961
> > I will fire a fix. But what I don't understand is why MailboxExecutor
doesn't work as expected
>
> Needs to dig into the backround before we fire a fix.
After debugging, i know `MailBoxExecutor` and `StreamReadOperator` are in
same thread, so it is not a bug. Split_reader each receives a split,
immediately call `enqueueProcessSplits` to submit a command to
`MailboxExecutor` , the thread is run as below :
```
time 1: receive split1
time 2: consumeAsMiniBatch(split1)
time 3: receive split2
time 4: consumeAsMiniBatch(split1)
time 5: consumeAsMiniBatch(split2)
time 6: receive split3
.......
time N: first barrier is comming (checkpoint already timeout)
```
We should not block the arrival of the barrier. Whether we can receive the
last split to read data?
```
org.apache.hudi.source.StreamReadMonitoringFunction#monitorDirAndForwardSplits:
for (int i = 0; i < result.getInputSplits().size(); i++) {
MergeOnReadInputSplit split = result.getInputSplits().get(i);
split.setLast(i == (result.getInputSplits().size()-1));
context.collect(split);
}
org.apache.hudi.source.StreamReadOperator#processElement:
public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
splits.add(element.getValue());
if (element.getValue().isLast()) {
enqueueProcessSplits();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]