fhan688 commented on code in PR #12166:
URL: https://github.com/apache/hudi/pull/12166#discussion_r1824140876
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java:
##########
@@ -214,8 +214,9 @@ public void
monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> cont
}
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.issuedOffset,
this.cdcEnabled);
- if (result.isEmpty()) {
+ if (result.equals(IncrementalInputSplits.Result.EMPTY)) {
Review Comment:
This bug is easy to be reproduced. Let us take UT
`TestStreamReadMonitoringFunction#testConsumeForSpeedLimitWhenEmptyCommitExists()`
as an example:
Step1: create 4 empty commit
Step2: trigger streaming read from first instant and set READ_COMMITS_LIMIT 2
Step3: assert current IssuedOffset couldn't be null.
Base on `IncrementalInputSplits#inputSplits =>
.startCompletionTime(issuedOffset != null ? issuedOffset :
this.conf.getString(FlinkOptions.READ_START_COMMIT))`
If IssuedOffset still was null, hudi would take
FlinkOptions.READ_START_COMMIT again, which means streaming read is blocked.
Without this PR this UT will be failed which means streaming reading empty
commits will be blocked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]