This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b806d5c074 [MINOR] Log wheather there has data files to be read in 
Flink streaming read (#12278)
2b806d5c074 is described below

commit 2b806d5c074df6113bd825d9367622bbb00046ee
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Thu Dec 12 12:13:36 2024 +0800

    [MINOR] Log wheather there has data files to be read in Flink streaming 
read (#12278)
---
 .../java/org/apache/hudi/source/StreamReadMonitoringFunction.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 6a847a4c1a1..bbbf39dab68 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -218,13 +218,14 @@ public class StreamReadMonitoringFunction
 
     if (result.isEmpty() && StringUtils.isNullOrEmpty(result.getEndInstant())) 
{
       // no new instants, returns early
-      LOG.warn("Result is empty, do not update issuedInstant.");
+      LOG.warn("No new instants to read for current run.");
       return;
     }
 
     for (MergeOnReadInputSplit split : result.getInputSplits()) {
       context.collect(split);
     }
+
     // update the issues instant time
     this.issuedInstant = result.getEndInstant();
     this.issuedOffset = result.getOffset();
@@ -234,6 +235,9 @@ public class StreamReadMonitoringFunction
             + "---------- consumed to instant: {}\n"
             + "------------------------------------------------------------",
         conf.getString(FlinkOptions.TABLE_NAME), this.issuedInstant);
+    if (result.isEmpty()) {
+      LOG.warn("No new files to read for current run.");
+    }
   }
 
   @Override

Reply via email to