xushiyan commented on code in PR #18040:
URL: https://github.com/apache/hudi/pull/18040#discussion_r2749034548


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,19 +18,75 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>This emitter handles watermark emission based on split information.
+ *
+ * @param <T> The type of records to emit
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
+  private HoodieSourceSplit lastSplit = null;
+  private long watermark = Long.MIN_VALUE;
 
   @Override
   public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    if (lastSplit == null || !split.splitId().equals(lastSplit.splitId())) {
+      long newWatermark = extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.warn(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",
+            watermark,
+            newWatermark,
+            lastSplit,
+            split);
+      } else {
+        watermark = newWatermark;
+        output.emitWatermark(new Watermark(watermark));
+        LOG.debug("Watermark = {} emitted based on split = {}", watermark, 
split);
+      }
+
+      lastSplit = split;
+    }
+
     output.collect(record.record());
     split.updatePosition(record.fileOffset(), record.recordOffset());
   }
+
+  private long extractWatermark(HoodieSourceSplit split) {
+    long maxInstantTime = Long.MIN_VALUE;
+
+    if (split.getBasePath().isPresent()) {
+      String basePath = split.getBasePath().get();
+      try {
+        long baseCommitTime = Long.parseLong(FSUtils.getCommitTime(basePath));
+        maxInstantTime = Math.max(baseCommitTime, maxInstantTime);
+      } catch (NumberFormatException e) {
+        LOG.warn("Failed to parse commit time from basePath: {}", basePath, e);
+      }
+    }
+
+    if (split.getLogPaths().isPresent()) {
+      for (String logPath : split.getLogPaths().get()) {
+        try {
+          long logCommitTime = Long.parseLong(FSUtils.getCommitTime(logPath));
+          maxInstantTime = Math.max(logCommitTime, maxInstantTime);
+        } catch (NumberFormatException e) {
+          LOG.warn("Failed to parse commit time from logPath: {}", logPath, e);
+        }
+      }
+    }
+
+    return maxInstantTime;

Review Comment:
   > 
https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L409
   
   Ok i was confused by the name, as it's instance var under source split 
class. So the latest commit here actually means the record reading boundary for 
file group reader to process, it's like an up-to timestamp. So when the split 
is processed, it'll actually be the right timestamp for watermark emission? 
because no later record after that timestamp should be processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to