fhan688 commented on code in PR #12166:
URL: https://github.com/apache/hudi/pull/12166#discussion_r1822195797


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -214,8 +214,9 @@ public void 
monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> cont
     }
     IncrementalInputSplits.Result result =
         incrementalInputSplits.inputSplits(metaClient, this.issuedOffset, 
this.cdcEnabled);
-    if (result.isEmpty()) {
+    if (result.equals(IncrementalInputSplits.Result.EMPTY)) {

Review Comment:
   In speed limit scenario, it can not be updated any more.  I try to give an 
example to explain this:
   
   assume current active timeline is 
instant1、instant2、instant3、instant4、instant5、instant6 ...
   If the issuedInstant is instant2 and read.speed.limit=2 and instant3 and 
instant4 are empty commits,  then the inputSplits method return 
`Result.instance(inputSplits, endInstant, offsetToIssue)`.
   In this return, inputSplits is Collections.emptyList() , endInstant is  
instant4,  offsetToIssue is max(instant3 completionTime, 
instant4.completionTime).
   
   So result.isEmpty() is true and monitorDirAndForwardSplits() returned, but 
offsetToIssue and issuedInstant are not updated. Next time when 
monitorDirAndForwardSplits() is called,  the inputSplits method return  the 
same as above, endless loop.
   @danny0405 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to