This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b806d5c074 [MINOR] Log wheather there has data files to be read in
Flink streaming read (#12278)
2b806d5c074 is described below
commit 2b806d5c074df6113bd825d9367622bbb00046ee
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Thu Dec 12 12:13:36 2024 +0800
[MINOR] Log wheather there has data files to be read in Flink streaming
read (#12278)
---
.../java/org/apache/hudi/source/StreamReadMonitoringFunction.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 6a847a4c1a1..bbbf39dab68 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -218,13 +218,14 @@ public class StreamReadMonitoringFunction
if (result.isEmpty() && StringUtils.isNullOrEmpty(result.getEndInstant()))
{
// no new instants, returns early
- LOG.warn("Result is empty, do not update issuedInstant.");
+ LOG.warn("No new instants to read for current run.");
return;
}
for (MergeOnReadInputSplit split : result.getInputSplits()) {
context.collect(split);
}
+
// update the issues instant time
this.issuedInstant = result.getEndInstant();
this.issuedOffset = result.getOffset();
@@ -234,6 +235,9 @@ public class StreamReadMonitoringFunction
+ "---------- consumed to instant: {}\n"
+ "------------------------------------------------------------",
conf.getString(FlinkOptions.TABLE_NAME), this.issuedInstant);
+ if (result.isEmpty()) {
+ LOG.warn("No new files to read for current run.");
+ }
}
@Override