fhan688 commented on code in PR #12166:
URL: https://github.com/apache/hudi/pull/12166#discussion_r1822195797
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -214,8 +214,9 @@ public void
monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> cont
}
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.issuedOffset,
this.cdcEnabled);
- if (result.isEmpty()) {
+ if (result.equals(IncrementalInputSplits.Result.EMPTY)) {
Review Comment:
In speed limit scenario, it can not be updated any more. I try to give an
example to explain this:
assume current active timeline is
instant1、instant2、instant3、instant4、instant5、instant6 ...
If the issuedInstant is instant2 and read.speed.limit=2 and instant3 and
instant4 are empty commits, then the inputSplits method return
`Result.instance(inputSplits, endInstant, offsetToIssue)`.
In this return, inputSplits is Collections.emptyList() , endInstant is
instant4, offsetToIssue is max(instant3 completionTime,
instant4.completionTime).
So result.isEmpty() is true and monitorDirAndForwardSplits() returned, but
offsetToIssue and issuedInstant are not updated. Next time when
monitorDirAndForwardSplits() is called, the inputSplits method return the
same as above, endless loop.
@danny0405
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]