danny0405 commented on code in PR #18040:
URL: https://github.com/apache/hudi/pull/18040#discussion_r2749048368


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,19 +18,55 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>This emitter handles watermark emission based on split information.
+ *
+ * @param <T> The type of records to emit
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
+  private HoodieSourceSplit lastSplit = null;
+  private long watermark = Long.MIN_VALUE;
 
   @Override
   public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    if (lastSplit == null || !split.splitId().equals(lastSplit.splitId())) {
+      long newWatermark = extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",

Review Comment:
   also has the question, if the watermark is only emitted after the whole 
split has been read, we needs a way to hook in the check for the end of split 
consumption, the current confition seems not right:
   
   ```java
   lastSplit == null || !split.splitId().equals(lastSplit.splitId())
   ```
   
   imagine the first split with only 1 record within there, if no new splits 
are consumed, the watermark will not emit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to