xushiyan commented on code in PR #18040:
URL: https://github.com/apache/hudi/pull/18040#discussion_r2744113904


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,19 +18,55 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>This emitter handles watermark emission based on split information.
+ *
+ * @param <T> The type of records to emit
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
+  private HoodieSourceSplit lastSplit = null;
+  private long watermark = Long.MIN_VALUE;
 
   @Override
   public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    if (lastSplit == null || !split.splitId().equals(lastSplit.splitId())) {
+      long newWatermark = extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",
+            watermark,
+            newWatermark,
+            lastSplit,
+            split);
+      } else {
+        watermark = newWatermark;
+        output.emitWatermark(new Watermark(watermark));
+        LOG.debug("Watermark = {} emitted based on split = {}", watermark, 
split);
+      }
+
+      lastSplit = split;
+    }
+
     output.collect(record.record());
     split.updatePosition(record.fileOffset(), record.recordOffset());
   }
+
+  private long extractWatermark(HoodieSourceSplit split) {
+    // CDC split will be handled later
+    ValidationUtils.checkArgument(split.getBasePath().isPresent(), "Split base 
path can't be null.");
+    return Long.parseLong(FSUtils.getCommitTime(split.getBasePath().get()));

Review Comment:
   HoodieSourceSplit always has latestCommit, shouldn't it be used directly?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieRecordEmitter.java:
##########
@@ -197,18 +202,141 @@ public void testEmitRecordAcrossMultipleFiles() throws 
Exception {
   }
 
   /**
-   * Helper method to create a test HoodieSourceSplit.
+   * Helper method to create a test HoodieSourceSplit with specific parameters.
    */
-  private HoodieSourceSplit createTestSplit() {
+  private HoodieSourceSplit createTestSplitWithParams(String basePath, String 
latestCommit, String fileId) {

Review Comment:
   the first arg is named basePath but actually represents a filename



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,19 +18,55 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>This emitter handles watermark emission based on split information.
+ *
+ * @param <T> The type of records to emit
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
+  private HoodieSourceSplit lastSplit = null;
+  private long watermark = Long.MIN_VALUE;
 
   @Override
   public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    if (lastSplit == null || !split.splitId().equals(lastSplit.splitId())) {
+      long newWatermark = extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",

Review Comment:
   wouldn't this info level log be too noisy? looks more like debug msg



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,19 +18,55 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>This emitter handles watermark emission based on split information.
+ *
+ * @param <T> The type of records to emit
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
+  private HoodieSourceSplit lastSplit = null;
+  private long watermark = Long.MIN_VALUE;
 
   @Override
   public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    if (lastSplit == null || !split.splitId().equals(lastSplit.splitId())) {
+      long newWatermark = extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",
+            watermark,
+            newWatermark,
+            lastSplit,
+            split);
+      } else {
+        watermark = newWatermark;
+        output.emitWatermark(new Watermark(watermark));
+        LOG.debug("Watermark = {} emitted based on split = {}", watermark, 
split);
+      }
+
+      lastSplit = split;
+    }
+
     output.collect(record.record());
     split.updatePosition(record.fileOffset(), record.recordOffset());
   }
+
+  private long extractWatermark(HoodieSourceSplit split) {
+    // CDC split will be handled later
+    ValidationUtils.checkArgument(split.getBasePath().isPresent(), "Split base 
path can't be null.");

Review Comment:
   this check assumes no base path, while the comment implies it's CDC split. 
Should this check be a silent skip instead of an exception? Or should CDC 
splits be explicitly excluded upstream? 
   
   



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java:
##########
@@ -18,19 +18,55 @@
 
 package org.apache.hudi.source.reader;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
  * Default Hoodie record emitter.
- * @param <T>
+ *
+ * <p>This emitter handles watermark emission based on split information.
+ *
+ * @param <T> The type of records to emit
  */
 public class HoodieRecordEmitter<T> implements 
RecordEmitter<HoodieRecordWithPosition<T>, T, HoodieSourceSplit> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordEmitter.class);
+  private HoodieSourceSplit lastSplit = null;
+  private long watermark = Long.MIN_VALUE;
 
   @Override
   public void emitRecord(HoodieRecordWithPosition<T> record, SourceOutput<T> 
output, HoodieSourceSplit split) throws Exception {
+    if (lastSplit == null || !split.splitId().equals(lastSplit.splitId())) {
+      long newWatermark = extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",
+            watermark,
+            newWatermark,
+            lastSplit,
+            split);
+      } else {
+        watermark = newWatermark;
+        output.emitWatermark(new Watermark(watermark));
+        LOG.debug("Watermark = {} emitted based on split = {}", watermark, 
split);
+      }
+
+      lastSplit = split;
+    }
+
     output.collect(record.record());
     split.updatePosition(record.fileOffset(), record.recordOffset());
   }
+
+  private long extractWatermark(HoodieSourceSplit split) {
+    // CDC split will be handled later
+    ValidationUtils.checkArgument(split.getBasePath().isPresent(), "Split base 
path can't be null.");

Review Comment:
   Consider adding explicit handling for CDC splits (even if just logging and 
skipping watermark emission)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to