This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e059c1bfb Flink: Backport #8553 to v1.15, v1.16 (#9145)
5e059c1bfb is described below

commit 5e059c1bfb664cc1880883425bfd68b2a8df3190
Author: pvary <[email protected]>
AuthorDate: Tue Nov 28 22:59:44 2023 +0100

    Flink: Backport #8553 to v1.15, v1.16 (#9145)
---
 .../apache/iceberg/flink/source/IcebergSource.java |  68 ++-
 .../reader/ColumnStatsWatermarkExtractor.java      |  98 +++++
 .../flink/source/reader/IcebergSourceReader.java   |   3 +-
 ...Emitter.java => SerializableRecordEmitter.java} |  24 +-
 .../source/reader/SplitWatermarkExtractor.java}    |  18 +-
 .../reader/WatermarkExtractorRecordEmitter.java    |  67 +++
 .../flink/source/split/SplitComparators.java       |  19 +-
 .../flink/source/TestIcebergSourceFailover.java    |  12 +-
 ...cebergSourceFailoverWithWatermarkExtractor.java | 112 +++++
 .../TestIcebergSourceWithWatermarkExtractor.java   | 481 +++++++++++++++++++++
 .../source/assigner/SplitAssignerTestBase.java     |  21 +-
 .../TestFileSequenceNumberBasedSplitAssigner.java  |  10 +-
 .../assigner/TestWatermarkBasedSplitAssigner.java  | 146 +++++++
 .../iceberg/flink/source/reader/ReaderUtil.java    |   8 +-
 .../reader/TestColumnStatsWatermarkExtractor.java  | 178 ++++++++
 .../source/reader/TestIcebergSourceReader.java     |   7 +-
 .../apache/iceberg/flink/source/IcebergSource.java |  68 ++-
 .../reader/ColumnStatsWatermarkExtractor.java      |  98 +++++
 .../flink/source/reader/IcebergSourceReader.java   |   3 +-
 .../source/reader/SerializableRecordEmitter.java}  |  24 +-
 .../source/reader/SplitWatermarkExtractor.java}    |  18 +-
 .../reader/WatermarkExtractorRecordEmitter.java    |  67 +++
 .../flink/source/split/SplitComparators.java       |  19 +-
 .../flink/source/TestIcebergSourceFailover.java    |  12 +-
 ...cebergSourceFailoverWithWatermarkExtractor.java | 112 +++++
 .../TestIcebergSourceWithWatermarkExtractor.java   | 451 +++++++++++++++++++
 .../source/assigner/SplitAssignerTestBase.java     |  21 +-
 .../TestFileSequenceNumberBasedSplitAssigner.java  |  10 +-
 .../assigner/TestWatermarkBasedSplitAssigner.java  | 146 +++++++
 .../iceberg/flink/source/reader/ReaderUtil.java    |   8 +-
 .../reader/TestColumnStatsWatermarkExtractor.java  | 178 ++++++++
 .../source/reader/TestIcebergSourceReader.java     |   7 +-
 32 files changed, 2408 insertions(+), 106 deletions(-)

diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index f85f627726..179253cb3a 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -58,15 +59,20 @@ import 
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
 import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
 import 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
 import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
 import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
 import org.apache.iceberg.flink.source.reader.ReaderFunction;
 import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   private final ReaderFunction<T> readerFunction;
   private final SplitAssignerFactory assignerFactory;
   private final SerializableComparator<IcebergSourceSplit> splitComparator;
+  private final SerializableRecordEmitter<T> emitter;
 
   // Can't use SerializableTable as enumerator needs a regular table
   // that can discover table changes
@@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       ReaderFunction<T> readerFunction,
       SplitAssignerFactory assignerFactory,
       SerializableComparator<IcebergSourceSplit> splitComparator,
-      Table table) {
+      Table table,
+      SerializableRecordEmitter<T> emitter) {
     this.tableLoader = tableLoader;
     this.scanContext = scanContext;
     this.readerFunction = readerFunction;
     this.assignerFactory = assignerFactory;
     this.splitComparator = splitComparator;
     this.table = table;
+    this.emitter = emitter;
   }
 
   String name() {
@@ -152,7 +161,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext 
readerContext) {
     IcebergSourceReaderMetrics metrics =
         new IcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
-    return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, 
readerContext);
+    return new IcebergSourceReader<>(
+        emitter, metrics, readerFunction, splitComparator, readerContext);
   }
 
   @Override
@@ -216,6 +226,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
     private SerializableComparator<IcebergSourceSplit> splitComparator;
+    private String watermarkColumn;
+    private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
     private ReaderFunction<T> readerFunction;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -237,6 +249,9 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     }
 
     public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
+      Preconditions.checkArgument(
+          watermarkColumn == null,
+          "Watermark column and SplitAssigner should not be set in the same 
source");
       this.splitAssignerFactory = assignerFactory;
       return this;
     }
@@ -429,6 +444,33 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    /**
+     * Emits watermarks once per split based on the min value of column 
statistics from files
+     * metadata in the given split. The generated watermarks are also used for 
ordering the splits
+     * for read. Accepted column types are timestamp/timestamptz/long. For 
long columns consider
+     * setting {@link #watermarkTimeUnit(TimeUnit)}.
+     *
+     * <p>Consider setting `read.split.open-file-cost` to prevent combining 
small files to a single
+     * split when the watermark is used for watermark alignment.
+     */
+    public Builder<T> watermarkColumn(String columnName) {
+      Preconditions.checkArgument(
+          splitAssignerFactory == null,
+          "Watermark column and SplitAssigner should not be set in the same 
source");
+      this.watermarkColumn = columnName;
+      return this;
+    }
+
+    /**
+     * When the type of the {@link #watermarkColumn} is {@link
+     * org.apache.iceberg.types.Types.LongType}, then sets the {@link 
TimeUnit} to convert the
+     * value. The default value is {@link TimeUnit#MICROSECONDS}.
+     */
+    public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
+      this.watermarkTimeUnit = timeUnit;
+      return this;
+    }
+
     /** @deprecated Use {@link #setAll} instead. */
     @Deprecated
     public Builder<T> properties(Map<String, String> properties) {
@@ -453,6 +495,18 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
+      SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      if (watermarkColumn != null) {
+        // Column statistics is needed for watermark generation
+        contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
+
+        SplitWatermarkExtractor watermarkExtractor =
+            new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, 
watermarkTimeUnit);
+        emitter = 
SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
+        splitAssignerFactory =
+            new 
OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
+      }
+
       ScanContext context = contextBuilder.build();
       if (readerFunction == null) {
         if (table instanceof BaseMetadataTable) {
@@ -485,8 +539,14 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
 
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid 
double loading
-      return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, 
splitComparator, table);
+      return new IcebergSource<>(
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          splitComparator,
+          table,
+          emitter);
     }
 
     private void checkRequired() {
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..4bb6f0a98c
--- /dev/null
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int eventTimeFieldId;
+  private final String eventTimeFieldName;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param eventTimeFieldName The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(
+      Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(eventTimeFieldName);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.eventTimeFieldId = field.fieldId();
+    this.eventTimeFieldName = eventTimeFieldName;
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;
+  }
+
+  @VisibleForTesting
+  ColumnStatsWatermarkExtractor(int eventTimeFieldId, String 
eventTimeFieldName) {
+    this.eventTimeFieldId = eventTimeFieldId;
+    this.eventTimeFieldName = eventTimeFieldName;
+    this.timeUnit = TimeUnit.MICROSECONDS;
+  }
+
+  /**
+   * Get the watermark for a split using column statistics.
+   *
+   * @param split The split
+   * @return The watermark
+   * @throws IllegalArgumentException if there is no statistics for the column
+   */
+  @Override
+  public long extractWatermark(IcebergSourceSplit split) {
+    return split.task().files().stream()
+        .map(
+            scanTask -> {
+              Preconditions.checkArgument(
+                  scanTask.file().lowerBounds() != null
+                      && scanTask.file().lowerBounds().get(eventTimeFieldId) 
!= null,
+                  "Missing statistics for column name = %s in file = %s",
+                  eventTimeFieldName,
+                  eventTimeFieldId,
+                  scanTask.file());
+              return timeUnit.toMillis(
+                  Conversions.fromByteBuffer(
+                      Types.LongType.get(), 
scanTask.file().lowerBounds().get(eventTimeFieldId)));
+            })
+        .min(Comparator.comparingLong(l -> l))
+        .get();
+  }
+}
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 8d7d68f961..f143b8d2df 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
         RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {
 
   public IcebergSourceReader(
+      SerializableRecordEmitter<T> emitter,
       IcebergSourceReaderMetrics metrics,
       ReaderFunction<T> readerFunction,
       SerializableComparator<IcebergSourceSplit> splitComparator,
       SourceReaderContext context) {
     super(
         () -> new IcebergSourceSplitReader<>(metrics, readerFunction, 
splitComparator, context),
-        new IcebergSourceRecordEmitter<>(),
+        emitter,
         context.getConfiguration(),
         context);
   }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
similarity index 61%
copy from 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
copy to 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
index 337d9d3c42..a6e2c1dae2 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
@@ -18,19 +18,23 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import org.apache.flink.api.connector.source.SourceOutput;
+import java.io.Serializable;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 
-final class IcebergSourceRecordEmitter<T>
-    implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
-  IcebergSourceRecordEmitter() {}
+@Internal
+@FunctionalInterface
+public interface SerializableRecordEmitter<T>
+    extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>, 
Serializable {
+  static <T> SerializableRecordEmitter<T> defaultEmitter() {
+    return (element, output, split) -> {
+      output.collect(element.record());
+      split.updatePosition(element.fileOffset(), element.recordOffset());
+    };
+  }
 
-  @Override
-  public void emitRecord(
-      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
-    output.collect(element.record());
-    split.updatePosition(element.fileOffset(), element.recordOffset());
+  static <T> SerializableRecordEmitter<T> 
emitterWithWatermark(SplitWatermarkExtractor extractor) {
+    return new WatermarkExtractorRecordEmitter<>(extractor);
   }
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
similarity index 63%
rename from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
rename to 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
index 337d9d3c42..d1c50ac8ca 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
@@ -18,19 +18,11 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import java.io.Serializable;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 
-final class IcebergSourceRecordEmitter<T>
-    implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
-  IcebergSourceRecordEmitter() {}
-
-  @Override
-  public void emitRecord(
-      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
-    output.collect(element.record());
-    split.updatePosition(element.fileOffset(), element.recordOffset());
-  }
+/** The interface used to extract watermarks from splits. */
+public interface SplitWatermarkExtractor extends Serializable {
+  /** Get the watermark for a split. */
+  long extractWatermark(IcebergSourceSplit split);
 }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
new file mode 100644
index 0000000000..02ef57d344
--- /dev/null
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by 
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements 
SerializableRecordEmitter<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+  private final SplitWatermarkExtractor timeExtractor;
+  private String lastSplitId = null;
+  private long watermark;
+
+  WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+    this.timeExtractor = timeExtractor;
+  }
+
+  @Override
+  public void emitRecord(
+      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
+    if (!split.splitId().equals(lastSplitId)) {
+      long newWatermark = timeExtractor.extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",
+            watermark,
+            newWatermark,
+            lastSplitId,
+            split.splitId());
+      } else {
+        watermark = newWatermark;
+        output.emitWatermark(new Watermark(watermark));
+        LOG.debug("Watermark = {} emitted based on split = {}", watermark, 
lastSplitId);
+      }
+
+      lastSplitId = split.splitId();
+    }
+
+    output.collect(element.record());
+    split.updatePosition(element.fileOffset(), element.recordOffset());
+  }
+}
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
index 64e03d77de..56ee92014d 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.source.split;
 
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 /**
@@ -45,7 +46,7 @@ public class SplitComparators {
           o1);
       Preconditions.checkNotNull(
           seq2,
-          "IInvalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
+          "Invalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
           o2);
 
       int temp = Long.compare(seq1, seq2);
@@ -56,4 +57,20 @@ public class SplitComparators {
       }
     };
   }
+
+  /** Comparator which orders the splits based on watermark of the splits */
+  public static SerializableComparator<IcebergSourceSplit> watermark(
+      SplitWatermarkExtractor watermarkExtractor) {
+    return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+      long watermark1 = watermarkExtractor.extractWatermark(o1);
+      long watermark2 = watermarkExtractor.extractWatermark(o2);
+
+      int temp = Long.compare(watermark1, watermark2);
+      if (temp != 0) {
+        return temp;
+      } else {
+        return o1.splitId().compareTo(o2.splitId());
+      }
+    };
+  }
 }
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 70e7a79d83..7d991ee603 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
@@ -97,6 +98,11 @@ public class TestIcebergSourceFailover {
     return RandomGenericData.generate(schema(), numRecords, seed);
   }
 
+  protected void assertRecords(Table table, List<Record> expectedRecords, 
Duration timeout)
+      throws Exception {
+    SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
+  }
+
   @Test
   public void testBoundedWithTaskManagerFailover() throws Exception {
     testBoundedIcebergSource(FailoverType.TM);
@@ -150,8 +156,7 @@ public class TestIcebergSourceFailover {
         RecordCounterToFail::continueProcessing,
         miniClusterResource.getMiniCluster());
 
-    SimpleDataUtil.assertTableRecords(
-        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+    assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
   @Test
@@ -214,8 +219,7 @@ public class TestIcebergSourceFailover {
 
     // wait longer for continuous source to reduce flakiness
     // because CI servers tend to be overloaded.
-    SimpleDataUtil.assertTableRecords(
-        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+    assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
   // ------------------------------------------------------------------------
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
new file mode 100644
index 0000000000..f7dc931c50
--- /dev/null
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends 
TestIcebergSourceFailover {
+  // Increment ts by 15 minutes for each generateRecords batch
+  private static final long RECORD_BATCH_TS_INCREMENT_MILLI = 
TimeUnit.MINUTES.toMillis(15);
+  // Within a batch, increment ts by 1 second
+  private static final long RECORD_TS_INCREMENT_MILLI = 
TimeUnit.SECONDS.toMillis(1);
+
+  private final AtomicLong tsMilli = new 
AtomicLong(System.currentTimeMillis());
+
+  @Override
+  protected IcebergSource.Builder<RowData> sourceBuilder() {
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .watermarkColumn("ts")
+        .project(TestFixtures.TS_SCHEMA);
+  }
+
+  @Override
+  protected Schema schema() {
+    return TestFixtures.TS_SCHEMA;
+  }
+
+  @Override
+  protected List<Record> generateRecords(int numRecords, long seed) {
+    // Override the ts field to create a more realistic situation for event 
time alignment
+    tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
+    return RandomGenericData.generate(schema(), numRecords, seed).stream()
+        .peek(
+            record -> {
+              LocalDateTime ts =
+                  LocalDateTime.ofInstant(
+                      
Instant.ofEpochMilli(tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)),
+                      ZoneId.of("Z"));
+              record.setField("ts", ts);
+            })
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * This override is needed because {@link Comparators} used by {@link 
StructLikeWrapper} retrieves
+   * Timestamp type using Long type as inner class, while the {@link 
RandomGenericData} generates
+   * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This 
method normalizes the
+   * {@link LocalDateTime} to a Long type so that Comparators can continue to 
work.
+   */
+  @Override
+  protected void assertRecords(Table table, List<Record> expectedRecords, 
Duration timeout)
+      throws Exception {
+    List<Record> expectedNormalized = 
convertLocalDateTimeToMilli(expectedRecords);
+    Awaitility.await("expected list of records should be produced")
+        .atMost(timeout)
+        .untilAsserted(
+            () -> {
+              SimpleDataUtil.equalsRecords(
+                  expectedNormalized,
+                  
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+                  table.schema());
+              SimpleDataUtil.assertRecordsEqual(
+                  expectedNormalized,
+                  
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+                  table.schema());
+            });
+  }
+
+  private List<Record> convertLocalDateTimeToMilli(List<Record> records) {
+    return records.stream()
+        .peek(
+            r -> {
+              LocalDateTime localDateTime = ((LocalDateTime) r.getField("ts"));
+              r.setField("ts", 
localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
+            })
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
new file mode 100644
index 0000000000..7547323871
--- /dev/null
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+  private static final ConcurrentMap<Long, Integer> windows = 
Maps.newConcurrentMap();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  /**
+   * This is an integration test for watermark handling and windowing. 
Integration testing the
+   * following features:
+   *
+   * <ul>
+   *   <li>- Ordering of the splits
+   *   <li>- Emitting of watermarks
+   *   <li>- Firing windows based on watermarks
+   * </ul>
+   *
+   * <p>The test generates 4 splits
+   *
+   * <ul>
+   *   <li>- Split 1 - Watermark 100 min
+   *   <li>- Split 2, 3 - Watermark 0 min
+   *   <li>- Split 4 - Watermark 6 min
+   * </ul>
+   *
+   * <p>Creates a source with 5 minutes tumbling window with parallelism 1 (to 
prevent concurrency
+   * issues).
+   *
+   * <p>Checks that windows are handled correctly based on the emitted 
watermarks, and splits are
+   * read in the following order:
+   *
+   * <ul>
+   *   <li>- Split 2, 3
+   *   <li>- Split 4
+   *   <li>- Split 1
+   * </ul>
+   *
+   * <p>As a result the window aggregator emits the records based on in Split 
2-3, and Split 4 data.
+   *
+   * <p>Add 2 more splits, so the task manager close the windows for the 
original 4 splits and emit
+   * the appropriate aggregated records.
+   */
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 360000)
+    //    - Split 1 - 2 records (6, "file_3-recordTs_6"), (7, 
"file_3-recordTs_7")
+    List<Record> batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"),
+            generateRecord(101, "file_1-recordTs_101"),
+            generateRecord(103, "file_1-recordTs_103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      // Generate records where the timestamps are out of order, but still 
between 0-5 minutes
+      batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch =
+        ImmutableList.of(
+            generateRecord(6, "file_3-recordTs_6"), generateRecord(7, 
"file_3-recordTs_7"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(1);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            source(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    stream
+        .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+        .apply(
+            new AllWindowFunction<RowData, RowData, TimeWindow>() {
+              @Override
+              public void apply(
+                  TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                // Emit RowData which contains the window start time, and the 
record count in
+                // that window
+                AtomicInteger count = new AtomicInteger(0);
+                values.forEach(a -> count.incrementAndGet());
+                out.collect(row(window.getStart(), count.get()));
+                windows.put(window.getStart(), count.get());
+              }
+            });
+
+    // Use static variable to collect the windows, since other solutions were 
flaky
+    windows.clear();
+    env.executeAsync("Iceberg Source Windowing Test");
+
+    // Wait for the 2 first windows from File 2 and File 3
+    Awaitility.await()
+        .pollInterval(Duration.ofMillis(10))
+        .atMost(30, TimeUnit.SECONDS)
+        .until(
+            () ->
+                windows.equals(
+                    ImmutableMap.of(0L, RECORD_NUM_FOR_2_SPLITS, 
TimeUnit.MINUTES.toMillis(5), 2)));
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(
+        dataAppender.writeFile(ImmutableList.of(generateRecord(1500, 
"last-record"))));
+
+    // Wait for last test record window from File 1
+    Awaitility.await()
+        .pollInterval(Duration.ofMillis(10))
+        .atMost(30, TimeUnit.SECONDS)
+        .until(
+            () ->
+                windows.equals(
+                    ImmutableMap.of(
+                        0L,
+                        RECORD_NUM_FOR_2_SPLITS,
+                        TimeUnit.MINUTES.toMillis(5),
+                        2,
+                        TimeUnit.MINUTES.toMillis(100),
+                        3)));
+  }
+
+  /**
+   * This is an integration test for watermark handling and throttling. 
Integration testing the
+   * following:
+   *
+   * <ul>
+   *   <li>- Emitting of watermarks
+   *   <li>- Watermark alignment
+   * </ul>
+   *
+   * <p>The test generates 3 splits
+   *
+   * <ul>
+   *   <li>- Split 1 - Watermark 100 min
+   *   <li>- Split 2, 3 - Watermark 0 min
+   * </ul>
+   *
+   * The splits are read in the following order:
+   *
+   * <ul>
+   *   <li>- Split 2, 3 (Task Manager 1, Task Manager 2)
+   *   <li>- Split 1 (Task Manager 1 or ask Manager 2 depending on scheduling)
+   * </ul>
+   *
+   * Reading split 1 will cause the watermark alignment to pause reading for 
the given task manager.
+   *
+   * <p>The status of the watermark alignment is checked by the alignment 
related metrics.
+   *
+   * <p>Adding new records with old timestamps to the table will enable the 
running reader to
+   * continue reading the files, but the watermark alignment will still 
prevent the paused reader to
+   * continue.
+   *
+   * <p>After adding some records with new timestamps the blocked reader is 
un-paused, and both ot
+   * the readers continue reading.
+   */
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    List<Record> batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"), generateRecord(103, 
"file_1-recordTs_103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            source(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), 
Duration.ofMillis(10)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    // Flink 1.15 only change - start
+    CollectResultIterator<RowData> resultStream = addCollectSink(stream);
+
+    // Start the job
+    JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
+    resultStream.setJobClient(jobClient);
+    try (CollectResultIterator<RowData> resultIterator = resultStream) {
+      // Flink 1.15 only change - end
+
+      // Check that the read the non-blocked data
+      // The first RECORD_NUM_FOR_2_SPLITS should be read
+      // 1 or more from the runaway reader should be arrived depending on 
thread scheduling
+      waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state
+      // (100 min - 20 min - 0 min)
+      // Also this validates that the WatermarkAlignment is working
+      Awaitility.await()
+          .pollInterval(Duration.ofMillis(10))
+          .atMost(30, TimeUnit.SECONDS)
+          .until(
+              () ->
+                  findAlignmentDriftMetric(jobClient.getJobID(), 
TimeUnit.MINUTES.toMillis(80))
+                      .isPresent());
+      Gauge<Long> drift =
+          findAlignmentDriftMetric(jobClient.getJobID(), 
TimeUnit.MINUTES.toMillis(80)).get();
+
+      // Add some old records with 2 splits, so even if the blocked gets one 
split, the other reader
+      // one gets one as well
+      List<Record> newBatch1 =
+          ImmutableList.of(
+              generateRecord(15, "file_3-recordTs_15"),
+              generateRecord(16, "file_3-recordTs_16"),
+              generateRecord(17, "file_3-recordTs_17"));
+      List<Record> newBatch2 =
+          ImmutableList.of(
+              generateRecord(15, "file_4-recordTs_15"),
+              generateRecord(16, "file_4-recordTs_16"),
+              generateRecord(17, "file_4-recordTs_17"));
+      dataAppender.appendToTable(
+          dataAppender.writeFile(newBatch1), 
dataAppender.writeFile(newBatch2));
+      // The records received will highly depend on scheduling
+      // We minimally get 3 records from the non-blocked reader
+      // We might get 1 record from the blocked reader (as part of the 
previous batch -
+      // file_1-recordTs_100)
+      // We might get 3 records form the non-blocked reader if it gets both 
new splits
+      waitForRecords(resultIterator, 3);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state (100 min - 20
+      // min - 15 min)
+      Awaitility.await()
+          .pollInterval(Duration.ofMillis(10))
+          .atMost(30, TimeUnit.SECONDS)
+          .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));
+
+      // Add some new records which should unblock the throttled reader
+      batch =
+          ImmutableList.of(
+              generateRecord(90, "file_5-recordTs_90"), generateRecord(91, 
"file_5-recordTs_91"));
+      dataAppender.appendToTable(batch);
+      // We should get all the records at this point
+      waitForRecords(resultIterator, 6);
+
+      // Wait for the new drift to decrease below the allowed drift to signal 
the normal state
+      Awaitility.await()
+          .pollInterval(Duration.ofMillis(10))
+          .atMost(30, TimeUnit.SECONDS)
+          .until(() -> drift.getValue() < TimeUnit.MINUTES.toMillis(20));
+    }
+  }
+
+  protected IcebergSource<RowData> source() {
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .watermarkColumn("ts")
+        .project(TestFixtures.TS_SCHEMA)
+        .splitSize(100L)
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(2))
+        
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+  }
+
+  protected Record generateRecord(int minutes, String str) {
+    // Override the ts field to create a more realistic situation for event 
time alignment
+    Record record = GenericRecord.create(TestFixtures.TS_SCHEMA);
+    LocalDateTime ts =
+        LocalDateTime.ofInstant(
+            Instant.ofEpochMilli(Time.of(minutes, 
TimeUnit.MINUTES).toMilliseconds()),
+            ZoneId.of("Z"));
+    record.setField("ts", ts);
+    record.setField("str", str);
+    return record;
+  }
+
+  protected void assertRecords(
+      Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) 
throws Exception {
+    Set<RowData> expected =
+        expectedRecords.stream()
+            .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
+            .collect(Collectors.toSet());
+    Assert.assertEquals(expected, waitForRecords(iterator, 
expectedRecords.size()));
+  }
+
+  protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, 
int num) {
+    Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
+    assertThat(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  int count = 0;
+                  while (count < num && iterator.hasNext()) {
+                    received.add(iterator.next());
+                    count++;
+                  }
+
+                  if (count < num) {
+                    throw new IllegalStateException(String.format("Fail to get 
%d records.", num));
+                  }
+
+                  return true;
+                }))
+        .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+
+    return received;
+  }
+
+  private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long 
withValue) {
+    String metricsName = SOURCE_NAME + ".*" + 
MetricNames.WATERMARK_ALIGNMENT_DRIFT;
+    return reporter.findMetrics(jobID, metricsName).values().stream()
+        .map(m -> (Gauge<Long>) m)
+        .filter(m -> m.getValue() == withValue)
+        .findFirst();
+  }
+
+  private GenericAppenderHelper appender() {
+    // We need to create multiple splits, so we need to generate parquet files 
with multiple offsets
+    org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+    hadoopConf.set("write.parquet.page-size-bytes", "64");
+    hadoopConf.set("write.parquet.row-group-size-bytes", "64");
+    return new GenericAppenderHelper(
+        sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER, 
hadoopConf);
+  }
+
+  private static RowData row(long time, long count) {
+    GenericRowData result = new GenericRowData(2);
+    result.setField(0, time);
+    result.setField(1, String.valueOf(count));
+    return result;
+  }
+
+  private static class RowDataTimestampAssigner implements 
SerializableTimestampAssigner<RowData> {
+    @Override
+    public long extractTimestamp(RowData element, long recordTimestamp) {
+      return element.getTimestamp(0, 0).getMillisecond();
+    }
+  }
+
+  // Flink 1.15 only method
+  private CollectResultIterator<RowData> addCollectSink(DataStream<RowData> 
stream) {
+    TypeSerializer<RowData> serializer =
+        stream.getType().createSerializer(stream.getExecutionConfig());
+    String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+    CollectSinkOperatorFactory<RowData> factory =
+        new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+    CollectSinkOperator<RowData> operator = (CollectSinkOperator<RowData>) 
factory.getOperator();
+    CollectStreamSink<RowData> sink = new CollectStreamSink<>(stream, factory);
+    sink.name("Data stream collect sink");
+    stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+    return new CollectResultIterator<>(
+        operator.getOperatorIdFuture(),
+        serializer,
+        accumulatorName,
+        stream.getExecutionEnvironment().getCheckpointConfig());
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index f28677ca9d..090b304942 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -43,15 +43,13 @@ public abstract class SplitAssignerTestBase {
   @Test
   public void testStaticEnumeratorSequence() throws Exception {
     SplitAssigner assigner = splitAssigner();
-    assigner.onDiscoveredSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
1));
+    assigner.onDiscoveredSplits(createSplits(4, 1, "1"));
 
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertSnapshot(assigner, 1);
-    assigner.onUnassignedSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1));
+    assigner.onUnassignedSplits(createSplits(1, 1, "1"));
     assertSnapshot(assigner, 2);
 
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -66,15 +64,12 @@ public abstract class SplitAssignerTestBase {
     SplitAssigner assigner = splitAssigner();
     assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
 
-    List<IcebergSourceSplit> splits1 =
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    List<IcebergSourceSplit> splits1 = createSplits(1, 1, "1");
     assertAvailableFuture(assigner, 1, () -> 
assigner.onDiscoveredSplits(splits1));
-    List<IcebergSourceSplit> splits2 =
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    List<IcebergSourceSplit> splits2 = createSplits(1, 1, "1");
     assertAvailableFuture(assigner, 1, () -> 
assigner.onUnassignedSplits(splits2));
 
-    assigner.onDiscoveredSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 2, 
1));
+    assigner.onDiscoveredSplits(createSplits(2, 1, "1"));
     assertSnapshot(assigner, 2);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -125,5 +120,11 @@ public abstract class SplitAssignerTestBase {
     Assert.assertEquals(splitCount, stateBeforeGet.size());
   }
 
+  protected List<IcebergSourceSplit> createSplits(int fileCount, int 
filesPerSplit, String version)
+      throws Exception {
+    return SplitHelpers.createSplitsFromTransientHadoopTable(
+        TEMPORARY_FOLDER, fileCount, filesPerSplit, version);
+  }
+
   protected abstract SplitAssigner splitAssigner();
 }
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
index 8b9e132e0e..e78634e6b8 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.assigner;
 
 import java.util.List;
 import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.flink.source.SplitHelpers;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.flink.source.split.SplitComparators;
@@ -40,9 +39,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestB
   public void testMultipleFilesInAnIcebergSplit() {
     SplitAssigner assigner = splitAssigner();
     Assertions.assertThatThrownBy(
-            () ->
-                assigner.onDiscoveredSplits(
-                    
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+            () -> assigner.onDiscoveredSplits(createSplits(4, 2, "2")),
             "Multiple files in a split is not allowed")
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageContaining("Please use 'split-open-file-cost'");
@@ -52,8 +49,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestB
   @Test
   public void testSplitSort() throws Exception {
     SplitAssigner assigner = splitAssigner();
-    List<IcebergSourceSplit> splits =
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5, 
1, "2");
+    List<IcebergSourceSplit> splits = createSplits(5, 1, "2");
 
     assigner.onDiscoveredSplits(splits.subList(3, 5));
     assigner.onDiscoveredSplits(splits.subList(0, 1));
@@ -76,7 +72,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestB
     Assert.assertNotNull(comparator);
   }
 
-  protected void assertGetNext(SplitAssigner assigner, Long 
expectedSequenceNumber) {
+  private void assertGetNext(SplitAssigner assigner, Long 
expectedSequenceNumber) {
     GetSplitResult result = assigner.getNext(null);
     ContentFile file = result.split().task().files().iterator().next().file();
     Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
new file mode 100644
index 0000000000..e1fc63fda9
--- /dev/null
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
+  public static final Schema SCHEMA =
+      new Schema(required(1, "timestamp_column", 
Types.TimestampType.withoutZone()));
+  private static final GenericAppenderFactory APPENDER_FACTORY = new 
GenericAppenderFactory(SCHEMA);
+
+  @Override
+  protected SplitAssigner splitAssigner() {
+    return new OrderedSplitAssignerFactory(
+            SplitComparators.watermark(
+                new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column", 
null)))
+        .createAssigner();
+  }
+
+  /** Test the assigner when multiple files are in a single split */
+  @Test
+  public void testMultipleFilesInAnIcebergSplit() {
+    SplitAssigner assigner = splitAssigner();
+    assigner.onDiscoveredSplits(createSplits(4, 2, "2"));
+
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+  }
+
+  /** Test sorted splits */
+  @Test
+  public void testSplitSort() {
+    SplitAssigner assigner = splitAssigner();
+
+    Instant now = Instant.now();
+    List<IcebergSourceSplit> splits =
+        IntStream.range(0, 5)
+            .mapToObj(i -> splitFromInstant(now.plus(i, ChronoUnit.MINUTES)))
+            .collect(Collectors.toList());
+
+    assigner.onDiscoveredSplits(splits.subList(3, 5));
+    assigner.onDiscoveredSplits(splits.subList(0, 1));
+    assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+    assertGetNext(assigner, splits.get(0));
+    assertGetNext(assigner, splits.get(1));
+    assertGetNext(assigner, splits.get(2));
+    assertGetNext(assigner, splits.get(3));
+    assertGetNext(assigner, splits.get(4));
+
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+  }
+
+  @Test
+  public void testSerializable() {
+    byte[] bytes =
+        SerializationUtil.serializeToBytes(
+            SplitComparators.watermark(
+                new ColumnStatsWatermarkExtractor(
+                    TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS)));
+    SerializableComparator<IcebergSourceSplit> comparator =
+        SerializationUtil.deserializeFromBytes(bytes);
+    Assert.assertNotNull(comparator);
+  }
+
+  private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split) 
{
+    GetSplitResult result = assigner.getNext(null);
+    Assert.assertEquals(result.split(), split);
+  }
+
+  @Override
+  protected List<IcebergSourceSplit> createSplits(
+      int fileCount, int filesPerSplit, String version) {
+    return IntStream.range(0, fileCount / filesPerSplit)
+        .mapToObj(
+            splitNum ->
+                splitFromRecords(
+                    IntStream.range(0, filesPerSplit)
+                        .mapToObj(
+                            fileNum ->
+                                RandomGenericData.generate(
+                                    SCHEMA, 2, splitNum * filesPerSplit + 
fileNum))
+                        .collect(Collectors.toList())))
+        .collect(Collectors.toList());
+  }
+
+  private IcebergSourceSplit splitFromInstant(Instant instant) {
+    Record record = GenericRecord.create(SCHEMA);
+    record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
+    return splitFromRecords(ImmutableList.of(ImmutableList.of(record)));
+  }
+
+  private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
+    try {
+      return IcebergSourceSplit.fromCombinedScanTask(
+          ReaderUtil.createCombinedScanTask(
+              records, TEMPORARY_FOLDER, FileFormat.PARQUET, 
APPENDER_FACTORY));
+    } catch (IOException e) {
+      throw new RuntimeException("Split creation exception", e);
+    }
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f9ceaf8422..2a2503ef24 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -60,9 +60,12 @@ public class ReaderUtil {
       FileFormat fileFormat,
       FileAppenderFactory<Record> appenderFactory)
       throws IOException {
-    try (FileAppender<Record> appender =
-        appenderFactory.newAppender(Files.localOutput(file), fileFormat)) {
+    FileAppender<Record> appender =
+        appenderFactory.newAppender(Files.localOutput(file), fileFormat);
+    try {
       appender.addAll(records);
+    } finally {
+      appender.close();
     }
 
     DataFile dataFile =
@@ -71,6 +74,7 @@ public class ReaderUtil {
             .withFileSizeInBytes(file.length())
             .withPath(file.toString())
             .withFormat(fileFormat)
+            .withMetrics(appender.metrics())
             .build();
 
     ResidualEvaluator residuals = 
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..afe8a5d015
--- /dev/null
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "timestamp_column", Types.TimestampType.withoutZone()),
+          required(2, "timestamptz_column", Types.TimestampType.withZone()),
+          required(3, "long_column", Types.LongType.get()),
+          required(4, "string_column", Types.StringType.get()));
+
+  private static final GenericAppenderFactory APPENDER_FACTORY = new 
GenericAppenderFactory(SCHEMA);
+
+  private static final List<List<Record>> TEST_RECORDS =
+      ImmutableList.of(
+          RandomGenericData.generate(SCHEMA, 3, 2L), 
RandomGenericData.generate(SCHEMA, 3, 19L));
+
+  private static final List<Map<String, Long>> MIN_VALUES =
+      ImmutableList.of(Maps.newHashMapWithExpectedSize(3), 
Maps.newHashMapWithExpectedSize(3));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private final String columnName;
+
+  @BeforeClass
+  public static void updateMinValue() {
+    for (int i = 0; i < TEST_RECORDS.size(); ++i) {
+      for (Record r : TEST_RECORDS.get(i)) {
+        Map<String, Long> minValues = MIN_VALUES.get(i);
+
+        LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+        minValues.merge(
+            "timestamp_column", 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
+
+        OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+        minValues.merge("timestamptz_column", 
offsetDateTime.toInstant().toEpochMilli(), Math::min);
+
+        minValues.merge("long_column", (Long) r.get(2), Math::min);
+      }
+    }
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> data() {
+    return ImmutableList.of(
+        new Object[] {"timestamp_column"},
+        new Object[] {"timestamptz_column"},
+        new Object[] {"long_column"});
+  }
+
+  public TestColumnStatsWatermarkExtractor(String columnName) {
+    this.columnName = columnName;
+  }
+
+  @Test
+  public void testSingle() throws IOException {
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MILLISECONDS);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
+  }
+
+  @Test
+  public void testTimeUnit() throws IOException {
+    Assume.assumeTrue("Run only for long column", 
columnName.equals("long_column"));
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MICROSECONDS);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue() / 1000L,
+        extractor.extractWatermark(split(0)));
+  }
+
+  @Test
+  public void testMultipleFiles() throws IOException {
+    Assume.assumeTrue("Run only for the timestamp column", 
columnName.equals("timestamp_column"));
+    IcebergSourceSplit combinedSplit =
+        IcebergSourceSplit.fromCombinedScanTask(
+            ReaderUtil.createCombinedScanTask(
+                TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, 
APPENDER_FACTORY));
+
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
+    Assert.assertEquals(
+        MIN_VALUES.get(1).get(columnName).longValue(), 
extractor.extractWatermark(split(1)));
+    Assert.assertEquals(
+        Math.min(MIN_VALUES.get(0).get(columnName), 
MIN_VALUES.get(1).get(columnName)),
+        extractor.extractWatermark(combinedSplit));
+  }
+
+  @Test
+  public void testWrongColumn() {
+    Assume.assumeTrue("Run only for string column", 
columnName.equals("string_column"));
+    Assertions.assertThatThrownBy(() -> new 
ColumnStatsWatermarkExtractor(SCHEMA, columnName, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Found STRING, expected a LONG or TIMESTAMP column for watermark 
generation.");
+  }
+
+  @Test
+  public void testEmptyStatistics() throws IOException {
+    Assume.assumeTrue("Run only for timestamp column", 
columnName.equals("timestamp_column"));
+
+    // Create an extractor for a column we do not have statistics
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(10, "missing_field");
+    Assertions.assertThatThrownBy(() -> extractor.extractWatermark(split(0)))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Missing statistics for column");
+  }
+
+  private IcebergSourceSplit split(int id) throws IOException {
+    return IcebergSourceSplit.fromCombinedScanTask(
+        ReaderUtil.createCombinedScanTask(
+            ImmutableList.of(TEST_RECORDS.get(id)),
+            TEMPORARY_FOLDER,
+            FileFormat.PARQUET,
+            APPENDER_FACTORY));
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index def4f43685..88234c6112 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -167,7 +167,12 @@ public class TestIcebergSourceReader {
             new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
             new PlaintextEncryptionManager(),
             Collections.emptyList());
-    return new IcebergSourceReader<>(readerMetrics, readerFunction, 
splitComparator, readerContext);
+    return new IcebergSourceReader<>(
+        SerializableRecordEmitter.defaultEmitter(),
+        readerMetrics,
+        readerFunction,
+        splitComparator,
+        readerContext);
   }
 
   private static class IdBasedComparator implements 
SerializableComparator<IcebergSourceSplit> {
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index f85f627726..179253cb3a 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -58,15 +59,20 @@ import 
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
 import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
 import 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
 import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
 import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
 import org.apache.iceberg.flink.source.reader.ReaderFunction;
 import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   private final ReaderFunction<T> readerFunction;
   private final SplitAssignerFactory assignerFactory;
   private final SerializableComparator<IcebergSourceSplit> splitComparator;
+  private final SerializableRecordEmitter<T> emitter;
 
   // Can't use SerializableTable as enumerator needs a regular table
   // that can discover table changes
@@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       ReaderFunction<T> readerFunction,
       SplitAssignerFactory assignerFactory,
       SerializableComparator<IcebergSourceSplit> splitComparator,
-      Table table) {
+      Table table,
+      SerializableRecordEmitter<T> emitter) {
     this.tableLoader = tableLoader;
     this.scanContext = scanContext;
     this.readerFunction = readerFunction;
     this.assignerFactory = assignerFactory;
     this.splitComparator = splitComparator;
     this.table = table;
+    this.emitter = emitter;
   }
 
   String name() {
@@ -152,7 +161,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext 
readerContext) {
     IcebergSourceReaderMetrics metrics =
         new IcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
-    return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, 
readerContext);
+    return new IcebergSourceReader<>(
+        emitter, metrics, readerFunction, splitComparator, readerContext);
   }
 
   @Override
@@ -216,6 +226,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
     private SerializableComparator<IcebergSourceSplit> splitComparator;
+    private String watermarkColumn;
+    private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
     private ReaderFunction<T> readerFunction;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -237,6 +249,9 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     }
 
     public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
+      Preconditions.checkArgument(
+          watermarkColumn == null,
+          "Watermark column and SplitAssigner should not be set in the same 
source");
       this.splitAssignerFactory = assignerFactory;
       return this;
     }
@@ -429,6 +444,33 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    /**
+     * Emits watermarks once per split based on the min value of column 
statistics from files
+     * metadata in the given split. The generated watermarks are also used for 
ordering the splits
+     * for read. Accepted column types are timestamp/timestamptz/long. For 
long columns consider
+     * setting {@link #watermarkTimeUnit(TimeUnit)}.
+     *
+     * <p>Consider setting `read.split.open-file-cost` to prevent combining 
small files to a single
+     * split when the watermark is used for watermark alignment.
+     */
+    public Builder<T> watermarkColumn(String columnName) {
+      Preconditions.checkArgument(
+          splitAssignerFactory == null,
+          "Watermark column and SplitAssigner should not be set in the same 
source");
+      this.watermarkColumn = columnName;
+      return this;
+    }
+
+    /**
+     * When the type of the {@link #watermarkColumn} is {@link
+     * org.apache.iceberg.types.Types.LongType}, then sets the {@link 
TimeUnit} to convert the
+     * value. The default value is {@link TimeUnit#MICROSECONDS}.
+     */
+    public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
+      this.watermarkTimeUnit = timeUnit;
+      return this;
+    }
+
     /** @deprecated Use {@link #setAll} instead. */
     @Deprecated
     public Builder<T> properties(Map<String, String> properties) {
@@ -453,6 +495,18 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
       }
 
+      SerializableRecordEmitter<T> emitter = 
SerializableRecordEmitter.defaultEmitter();
+      if (watermarkColumn != null) {
+        // Column statistics is needed for watermark generation
+        contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
+
+        SplitWatermarkExtractor watermarkExtractor =
+            new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, 
watermarkTimeUnit);
+        emitter = 
SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
+        splitAssignerFactory =
+            new 
OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
+      }
+
       ScanContext context = contextBuilder.build();
       if (readerFunction == null) {
         if (table instanceof BaseMetadataTable) {
@@ -485,8 +539,14 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
 
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid 
double loading
-      return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, 
splitComparator, table);
+      return new IcebergSource<>(
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          splitComparator,
+          table,
+          emitter);
     }
 
     private void checkRequired() {
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..4bb6f0a98c
--- /dev/null
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg 
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is 
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, 
Serializable {
+  private final int eventTimeFieldId;
+  private final String eventTimeFieldName;
+  private final TimeUnit timeUnit;
+
+  /**
+   * Creates the extractor.
+   *
+   * @param schema The schema of the Table
+   * @param eventTimeFieldName The column which should be used as an event time
+   * @param timeUnit Used for converting the long value to epoch milliseconds
+   */
+  public ColumnStatsWatermarkExtractor(
+      Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
+    Types.NestedField field = schema.findField(eventTimeFieldName);
+    TypeID typeID = field.type().typeId();
+    Preconditions.checkArgument(
+        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID);
+    this.eventTimeFieldId = field.fieldId();
+    this.eventTimeFieldName = eventTimeFieldName;
+    // Use the timeUnit only for Long columns.
+    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;
+  }
+
+  @VisibleForTesting
+  ColumnStatsWatermarkExtractor(int eventTimeFieldId, String 
eventTimeFieldName) {
+    this.eventTimeFieldId = eventTimeFieldId;
+    this.eventTimeFieldName = eventTimeFieldName;
+    this.timeUnit = TimeUnit.MICROSECONDS;
+  }
+
+  /**
+   * Get the watermark for a split using column statistics.
+   *
+   * @param split The split
+   * @return The watermark
+   * @throws IllegalArgumentException if there is no statistics for the column
+   */
+  @Override
+  public long extractWatermark(IcebergSourceSplit split) {
+    return split.task().files().stream()
+        .map(
+            scanTask -> {
+              Preconditions.checkArgument(
+                  scanTask.file().lowerBounds() != null
+                      && scanTask.file().lowerBounds().get(eventTimeFieldId) 
!= null,
+                  "Missing statistics for column name = %s in file = %s",
+                  eventTimeFieldName,
+                  eventTimeFieldId,
+                  scanTask.file());
+              return timeUnit.toMillis(
+                  Conversions.fromByteBuffer(
+                      Types.LongType.get(), 
scanTask.file().lowerBounds().get(eventTimeFieldId)));
+            })
+        .min(Comparator.comparingLong(l -> l))
+        .get();
+  }
+}
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 8d7d68f961..f143b8d2df 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
         RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {
 
   public IcebergSourceReader(
+      SerializableRecordEmitter<T> emitter,
       IcebergSourceReaderMetrics metrics,
       ReaderFunction<T> readerFunction,
       SerializableComparator<IcebergSourceSplit> splitComparator,
       SourceReaderContext context) {
     super(
         () -> new IcebergSourceSplitReader<>(metrics, readerFunction, 
splitComparator, context),
-        new IcebergSourceRecordEmitter<>(),
+        emitter,
         context.getConfiguration(),
         context);
   }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
similarity index 61%
copy from 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
copy to 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
index 337d9d3c42..a6e2c1dae2 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
@@ -18,19 +18,23 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import org.apache.flink.api.connector.source.SourceOutput;
+import java.io.Serializable;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 
-final class IcebergSourceRecordEmitter<T>
-    implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
-  IcebergSourceRecordEmitter() {}
+@Internal
+@FunctionalInterface
+public interface SerializableRecordEmitter<T>
+    extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>, 
Serializable {
+  static <T> SerializableRecordEmitter<T> defaultEmitter() {
+    return (element, output, split) -> {
+      output.collect(element.record());
+      split.updatePosition(element.fileOffset(), element.recordOffset());
+    };
+  }
 
-  @Override
-  public void emitRecord(
-      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
-    output.collect(element.record());
-    split.updatePosition(element.fileOffset(), element.recordOffset());
+  static <T> SerializableRecordEmitter<T> 
emitterWithWatermark(SplitWatermarkExtractor extractor) {
+    return new WatermarkExtractorRecordEmitter<>(extractor);
   }
 }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
similarity index 63%
rename from 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
rename to 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
index 337d9d3c42..d1c50ac8ca 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
@@ -18,19 +18,11 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
-import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import java.io.Serializable;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 
-final class IcebergSourceRecordEmitter<T>
-    implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
-  IcebergSourceRecordEmitter() {}
-
-  @Override
-  public void emitRecord(
-      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
-    output.collect(element.record());
-    split.updatePosition(element.fileOffset(), element.recordOffset());
-  }
+/** The interface used to extract watermarks from splits. */
+public interface SplitWatermarkExtractor extends Serializable {
+  /** Get the watermark for a split. */
+  long extractWatermark(IcebergSourceSplit split);
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
new file mode 100644
index 0000000000..02ef57d344
--- /dev/null
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by 
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements 
SerializableRecordEmitter<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+  private final SplitWatermarkExtractor timeExtractor;
+  private String lastSplitId = null;
+  private long watermark;
+
+  WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+    this.timeExtractor = timeExtractor;
+  }
+
+  @Override
+  public void emitRecord(
+      RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit 
split) {
+    if (!split.splitId().equals(lastSplitId)) {
+      long newWatermark = timeExtractor.extractWatermark(split);
+      if (newWatermark < watermark) {
+        LOG.info(
+            "Received a new split with lower watermark. Previous watermark = 
{}, current watermark = {}, previous split = {}, current split = {}",
+            watermark,
+            newWatermark,
+            lastSplitId,
+            split.splitId());
+      } else {
+        watermark = newWatermark;
+        output.emitWatermark(new Watermark(watermark));
+        LOG.debug("Watermark = {} emitted based on split = {}", watermark, 
lastSplitId);
+      }
+
+      lastSplitId = split.splitId();
+    }
+
+    output.collect(element.record());
+    split.updatePosition(element.fileOffset(), element.recordOffset());
+  }
+}
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
index 64e03d77de..56ee92014d 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.source.split;
 
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 /**
@@ -45,7 +46,7 @@ public class SplitComparators {
           o1);
       Preconditions.checkNotNull(
           seq2,
-          "IInvalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
+          "Invalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
           o2);
 
       int temp = Long.compare(seq1, seq2);
@@ -56,4 +57,20 @@ public class SplitComparators {
       }
     };
   }
+
+  /** Comparator which orders the splits based on watermark of the splits */
+  public static SerializableComparator<IcebergSourceSplit> watermark(
+      SplitWatermarkExtractor watermarkExtractor) {
+    return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+      long watermark1 = watermarkExtractor.extractWatermark(o1);
+      long watermark2 = watermarkExtractor.extractWatermark(o2);
+
+      int temp = Long.compare(watermark1, watermark2);
+      if (temp != 0) {
+        return temp;
+      } else {
+        return o1.splitId().compareTo(o2.splitId());
+      }
+    };
+  }
 }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 70e7a79d83..7d991ee603 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
@@ -97,6 +98,11 @@ public class TestIcebergSourceFailover {
     return RandomGenericData.generate(schema(), numRecords, seed);
   }
 
+  protected void assertRecords(Table table, List<Record> expectedRecords, 
Duration timeout)
+      throws Exception {
+    SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
+  }
+
   @Test
   public void testBoundedWithTaskManagerFailover() throws Exception {
     testBoundedIcebergSource(FailoverType.TM);
@@ -150,8 +156,7 @@ public class TestIcebergSourceFailover {
         RecordCounterToFail::continueProcessing,
         miniClusterResource.getMiniCluster());
 
-    SimpleDataUtil.assertTableRecords(
-        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+    assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
   @Test
@@ -214,8 +219,7 @@ public class TestIcebergSourceFailover {
 
     // wait longer for continuous source to reduce flakiness
     // because CI servers tend to be overloaded.
-    SimpleDataUtil.assertTableRecords(
-        sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+    assertRecords(sinkTableResource.table(), expectedRecords, 
Duration.ofSeconds(120));
   }
 
   // ------------------------------------------------------------------------
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
new file mode 100644
index 0000000000..f7dc931c50
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends 
TestIcebergSourceFailover {
+  // Increment ts by 15 minutes for each generateRecords batch
+  private static final long RECORD_BATCH_TS_INCREMENT_MILLI = 
TimeUnit.MINUTES.toMillis(15);
+  // Within a batch, increment ts by 1 second
+  private static final long RECORD_TS_INCREMENT_MILLI = 
TimeUnit.SECONDS.toMillis(1);
+
+  private final AtomicLong tsMilli = new 
AtomicLong(System.currentTimeMillis());
+
+  @Override
+  protected IcebergSource.Builder<RowData> sourceBuilder() {
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .watermarkColumn("ts")
+        .project(TestFixtures.TS_SCHEMA);
+  }
+
+  @Override
+  protected Schema schema() {
+    return TestFixtures.TS_SCHEMA;
+  }
+
+  @Override
+  protected List<Record> generateRecords(int numRecords, long seed) {
+    // Override the ts field to create a more realistic situation for event 
time alignment
+    tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
+    return RandomGenericData.generate(schema(), numRecords, seed).stream()
+        .peek(
+            record -> {
+              LocalDateTime ts =
+                  LocalDateTime.ofInstant(
+                      
Instant.ofEpochMilli(tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)),
+                      ZoneId.of("Z"));
+              record.setField("ts", ts);
+            })
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * This override is needed because {@link Comparators} used by {@link 
StructLikeWrapper} retrieves
+   * Timestamp type using Long type as inner class, while the {@link 
RandomGenericData} generates
+   * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This 
method normalizes the
+   * {@link LocalDateTime} to a Long type so that Comparators can continue to 
work.
+   */
+  @Override
+  protected void assertRecords(Table table, List<Record> expectedRecords, 
Duration timeout)
+      throws Exception {
+    List<Record> expectedNormalized = 
convertLocalDateTimeToMilli(expectedRecords);
+    Awaitility.await("expected list of records should be produced")
+        .atMost(timeout)
+        .untilAsserted(
+            () -> {
+              SimpleDataUtil.equalsRecords(
+                  expectedNormalized,
+                  
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+                  table.schema());
+              SimpleDataUtil.assertRecordsEqual(
+                  expectedNormalized,
+                  
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+                  table.schema());
+            });
+  }
+
+  private List<Record> convertLocalDateTimeToMilli(List<Record> records) {
+    return records.stream()
+        .peek(
+            r -> {
+              LocalDateTime localDateTime = ((LocalDateTime) r.getField("ts"));
+              r.setField("ts", 
localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
+            })
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
new file mode 100644
index 0000000000..0bb2eb7766
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+  private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+  private static final int PARALLELISM = 4;
+  private static final String SOURCE_NAME = "IcebergSource";
+  private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+  private static final ConcurrentMap<Long, Integer> windows = 
Maps.newConcurrentMap();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(
+          TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE, 
TestFixtures.TS_SCHEMA);
+
+  /**
+   * This is an integration test for watermark handling and windowing. 
Integration testing the
+   * following features:
+   *
+   * <ul>
+   *   <li>- Ordering of the splits
+   *   <li>- Emitting of watermarks
+   *   <li>- Firing windows based on watermarks
+   * </ul>
+   *
+   * <p>The test generates 4 splits
+   *
+   * <ul>
+   *   <li>- Split 1 - Watermark 100 min
+   *   <li>- Split 2, 3 - Watermark 0 min
+   *   <li>- Split 4 - Watermark 6 min
+   * </ul>
+   *
+   * <p>Creates a source with 5 minutes tumbling window with parallelism 1 (to 
prevent concurrency
+   * issues).
+   *
+   * <p>Checks that windows are handled correctly based on the emitted 
watermarks, and splits are
+   * read in the following order:
+   *
+   * <ul>
+   *   <li>- Split 2, 3
+   *   <li>- Split 4
+   *   <li>- Split 1
+   * </ul>
+   *
+   * <p>As a result the window aggregator emits the records based on in Split 
2-3, and Split 4 data.
+   *
+   * <p>Add 2 more splits, so the task manager close the windows for the 
original 4 splits and emit
+   * the appropriate aggregated records.
+   */
+  @Test
+  public void testWindowing() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    // - File 3 - Parallel write for the first records (Watermark 360000)
+    //    - Split 1 - 2 records (6, "file_3-recordTs_6"), (7, 
"file_3-recordTs_7")
+    List<Record> batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"),
+            generateRecord(101, "file_1-recordTs_101"),
+            generateRecord(103, "file_1-recordTs_103"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      // Generate records where the timestamps are out of order, but still 
between 0-5 minutes
+      batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+    }
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    batch =
+        ImmutableList.of(
+            generateRecord(6, "file_3-recordTs_6"), generateRecord(7, 
"file_3-recordTs_7"));
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(1);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            source(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withTimestampAssigner(new RowDataTimestampAssigner()),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    stream
+        .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+        .apply(
+            new AllWindowFunction<RowData, RowData, TimeWindow>() {
+              @Override
+              public void apply(
+                  TimeWindow window, Iterable<RowData> values, 
Collector<RowData> out) {
+                // Emit RowData which contains the window start time, and the 
record count in
+                // that window
+                AtomicInteger count = new AtomicInteger(0);
+                values.forEach(a -> count.incrementAndGet());
+                out.collect(row(window.getStart(), count.get()));
+                windows.put(window.getStart(), count.get());
+              }
+            });
+
+    // Use static variable to collect the windows, since other solutions were 
flaky
+    windows.clear();
+    env.executeAsync("Iceberg Source Windowing Test");
+
+    // Wait for the 2 first windows from File 2 and File 3
+    Awaitility.await()
+        .pollInterval(Duration.ofMillis(10))
+        .atMost(30, TimeUnit.SECONDS)
+        .until(
+            () ->
+                windows.equals(
+                    ImmutableMap.of(0L, RECORD_NUM_FOR_2_SPLITS, 
TimeUnit.MINUTES.toMillis(5), 2)));
+
+    // Write data so the windows containing test data are closed
+    dataAppender.appendToTable(
+        dataAppender.writeFile(ImmutableList.of(generateRecord(1500, 
"last-record"))));
+
+    // Wait for last test record window from File 1
+    Awaitility.await()
+        .pollInterval(Duration.ofMillis(10))
+        .atMost(30, TimeUnit.SECONDS)
+        .until(
+            () ->
+                windows.equals(
+                    ImmutableMap.of(
+                        0L,
+                        RECORD_NUM_FOR_2_SPLITS,
+                        TimeUnit.MINUTES.toMillis(5),
+                        2,
+                        TimeUnit.MINUTES.toMillis(100),
+                        3)));
+  }
+
+  /**
+   * This is an integration test for watermark handling and throttling. 
Integration testing the
+   * following:
+   *
+   * <ul>
+   *   <li>- Emitting of watermarks
+   *   <li>- Watermark alignment
+   * </ul>
+   *
+   * <p>The test generates 3 splits
+   *
+   * <ul>
+   *   <li>- Split 1 - Watermark 100 min
+   *   <li>- Split 2, 3 - Watermark 0 min
+   * </ul>
+   *
+   * The splits are read in the following order:
+   *
+   * <ul>
+   *   <li>- Split 2, 3 (Task Manager 1, Task Manager 2)
+   *   <li>- Split 1 (Task Manager 1 or ask Manager 2 depending on scheduling)
+   * </ul>
+   *
+   * Reading split 1 will cause the watermark alignment to pause reading for 
the given task manager.
+   *
+   * <p>The status of the watermark alignment is checked by the alignment 
related metrics.
+   *
+   * <p>Adding new records with old timestamps to the table will enable the 
running reader to
+   * continue reading the files, but the watermark alignment will still 
prevent the paused reader to
+   * continue.
+   *
+   * <p>After adding some records with new timestamps the blocked reader is 
un-paused, and both ot
+   * the readers continue reading.
+   */
+  @Test
+  public void testThrottling() throws Exception {
+    GenericAppenderHelper dataAppender = appender();
+
+    // Generate records with the following pattern:
+    // - File 1 - Later records (Watermark 6000000)
+    //    - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, 
"file_1-recordTs_103")
+    // - File 2 - First records (Watermark 0)
+    //    - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    //    - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, 
"file_2-recordTs_1"),...
+    List<Record> batch =
+        ImmutableList.of(
+            generateRecord(100, "file_1-recordTs_100"), generateRecord(103, 
"file_1-recordTs_103"));
+    dataAppender.appendToTable(batch);
+
+    batch = Lists.newArrayListWithCapacity(100);
+    for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+      batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+    }
+
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(2);
+
+    DataStream<RowData> stream =
+        env.fromSource(
+            source(),
+            WatermarkStrategy.<RowData>noWatermarks()
+                .withWatermarkAlignment("iceberg", Duration.ofMinutes(20), 
Duration.ofMillis(10)),
+            SOURCE_NAME,
+            TypeInformation.of(RowData.class));
+
+    try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+      JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
+
+      // Check that the read the non-blocked data
+      // The first RECORD_NUM_FOR_2_SPLITS should be read
+      // 1 or more from the runaway reader should be arrived depending on 
thread scheduling
+      waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state
+      // (100 min - 20 min - 0 min)
+      // Also this validates that the WatermarkAlignment is working
+      Awaitility.await()
+          .pollInterval(Duration.ofMillis(10))
+          .atMost(30, TimeUnit.SECONDS)
+          .until(
+              () ->
+                  findAlignmentDriftMetric(jobClient.getJobID(), 
TimeUnit.MINUTES.toMillis(80))
+                      .isPresent());
+      Gauge<Long> drift =
+          findAlignmentDriftMetric(jobClient.getJobID(), 
TimeUnit.MINUTES.toMillis(80)).get();
+
+      // Add some old records with 2 splits, so even if the blocked gets one 
split, the other reader
+      // one gets one as well
+      List<Record> newBatch1 =
+          ImmutableList.of(
+              generateRecord(15, "file_3-recordTs_15"),
+              generateRecord(16, "file_3-recordTs_16"),
+              generateRecord(17, "file_3-recordTs_17"));
+      List<Record> newBatch2 =
+          ImmutableList.of(
+              generateRecord(15, "file_4-recordTs_15"),
+              generateRecord(16, "file_4-recordTs_16"),
+              generateRecord(17, "file_4-recordTs_17"));
+      dataAppender.appendToTable(
+          dataAppender.writeFile(newBatch1), 
dataAppender.writeFile(newBatch2));
+      // The records received will highly depend on scheduling
+      // We minimally get 3 records from the non-blocked reader
+      // We might get 1 record from the blocked reader (as part of the 
previous batch -
+      // file_1-recordTs_100)
+      // We might get 3 records form the non-blocked reader if it gets both 
new splits
+      waitForRecords(resultIterator, 3);
+
+      // Get the drift metric, wait for it to be created and reach the 
expected state (100 min - 20
+      // min - 15 min)
+      Awaitility.await()
+          .pollInterval(Duration.ofMillis(10))
+          .atMost(30, TimeUnit.SECONDS)
+          .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));
+
+      // Add some new records which should unblock the throttled reader
+      batch =
+          ImmutableList.of(
+              generateRecord(90, "file_5-recordTs_90"), generateRecord(91, 
"file_5-recordTs_91"));
+      dataAppender.appendToTable(batch);
+      // We should get all the records at this point
+      waitForRecords(resultIterator, 6);
+
+      // Wait for the new drift to decrease below the allowed drift to signal 
the normal state
+      Awaitility.await()
+          .pollInterval(Duration.ofMillis(10))
+          .atMost(30, TimeUnit.SECONDS)
+          .until(() -> drift.getValue() < TimeUnit.MINUTES.toMillis(20));
+    }
+  }
+
+  protected IcebergSource<RowData> source() {
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .watermarkColumn("ts")
+        .project(TestFixtures.TS_SCHEMA)
+        .splitSize(100L)
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(2))
+        
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+  }
+
+  protected Record generateRecord(int minutes, String str) {
+    // Override the ts field to create a more realistic situation for event 
time alignment
+    Record record = GenericRecord.create(TestFixtures.TS_SCHEMA);
+    LocalDateTime ts =
+        LocalDateTime.ofInstant(
+            Instant.ofEpochMilli(Time.of(minutes, 
TimeUnit.MINUTES).toMilliseconds()),
+            ZoneId.of("Z"));
+    record.setField("ts", ts);
+    record.setField("str", str);
+    return record;
+  }
+
+  protected void assertRecords(
+      Collection<Record> expectedRecords, CloseableIterator<RowData> iterator) 
throws Exception {
+    Set<RowData> expected =
+        expectedRecords.stream()
+            .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
+            .collect(Collectors.toSet());
+    Assert.assertEquals(expected, waitForRecords(iterator, 
expectedRecords.size()));
+  }
+
+  protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator, 
int num) {
+    Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
+    assertThat(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  int count = 0;
+                  while (count < num && iterator.hasNext()) {
+                    received.add(iterator.next());
+                    count++;
+                  }
+
+                  if (count < num) {
+                    throw new IllegalStateException(String.format("Fail to get 
%d records.", num));
+                  }
+
+                  return true;
+                }))
+        .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+
+    return received;
+  }
+
+  private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long 
withValue) {
+    String metricsName = SOURCE_NAME + ".*" + 
MetricNames.WATERMARK_ALIGNMENT_DRIFT;
+    return reporter.findMetrics(jobID, metricsName).values().stream()
+        .map(m -> (Gauge<Long>) m)
+        .filter(m -> m.getValue() == withValue)
+        .findFirst();
+  }
+
+  private GenericAppenderHelper appender() {
+    // We need to create multiple splits, so we need to generate parquet files 
with multiple offsets
+    org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+    hadoopConf.set("write.parquet.page-size-bytes", "64");
+    hadoopConf.set("write.parquet.row-group-size-bytes", "64");
+    return new GenericAppenderHelper(
+        sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER, 
hadoopConf);
+  }
+
+  private static RowData row(long time, long count) {
+    GenericRowData result = new GenericRowData(2);
+    result.setField(0, time);
+    result.setField(1, String.valueOf(count));
+    return result;
+  }
+
+  private static class RowDataTimestampAssigner implements 
SerializableTimestampAssigner<RowData> {
+    @Override
+    public long extractTimestamp(RowData element, long recordTimestamp) {
+      return element.getTimestamp(0, 0).getMillisecond();
+    }
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index f28677ca9d..090b304942 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -43,15 +43,13 @@ public abstract class SplitAssignerTestBase {
   @Test
   public void testStaticEnumeratorSequence() throws Exception {
     SplitAssigner assigner = splitAssigner();
-    assigner.onDiscoveredSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
1));
+    assigner.onDiscoveredSplits(createSplits(4, 1, "1"));
 
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertSnapshot(assigner, 1);
-    assigner.onUnassignedSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1));
+    assigner.onUnassignedSplits(createSplits(1, 1, "1"));
     assertSnapshot(assigner, 2);
 
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -66,15 +64,12 @@ public abstract class SplitAssignerTestBase {
     SplitAssigner assigner = splitAssigner();
     assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
 
-    List<IcebergSourceSplit> splits1 =
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    List<IcebergSourceSplit> splits1 = createSplits(1, 1, "1");
     assertAvailableFuture(assigner, 1, () -> 
assigner.onDiscoveredSplits(splits1));
-    List<IcebergSourceSplit> splits2 =
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    List<IcebergSourceSplit> splits2 = createSplits(1, 1, "1");
     assertAvailableFuture(assigner, 1, () -> 
assigner.onUnassignedSplits(splits2));
 
-    assigner.onDiscoveredSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 2, 
1));
+    assigner.onDiscoveredSplits(createSplits(2, 1, "1"));
     assertSnapshot(assigner, 2);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -125,5 +120,11 @@ public abstract class SplitAssignerTestBase {
     Assert.assertEquals(splitCount, stateBeforeGet.size());
   }
 
+  protected List<IcebergSourceSplit> createSplits(int fileCount, int 
filesPerSplit, String version)
+      throws Exception {
+    return SplitHelpers.createSplitsFromTransientHadoopTable(
+        TEMPORARY_FOLDER, fileCount, filesPerSplit, version);
+  }
+
   protected abstract SplitAssigner splitAssigner();
 }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
index 8b9e132e0e..e78634e6b8 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.assigner;
 
 import java.util.List;
 import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.flink.source.SplitHelpers;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.flink.source.split.SplitComparators;
@@ -40,9 +39,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestB
   public void testMultipleFilesInAnIcebergSplit() {
     SplitAssigner assigner = splitAssigner();
     Assertions.assertThatThrownBy(
-            () ->
-                assigner.onDiscoveredSplits(
-                    
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+            () -> assigner.onDiscoveredSplits(createSplits(4, 2, "2")),
             "Multiple files in a split is not allowed")
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageContaining("Please use 'split-open-file-cost'");
@@ -52,8 +49,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestB
   @Test
   public void testSplitSort() throws Exception {
     SplitAssigner assigner = splitAssigner();
-    List<IcebergSourceSplit> splits =
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5, 
1, "2");
+    List<IcebergSourceSplit> splits = createSplits(5, 1, "2");
 
     assigner.onDiscoveredSplits(splits.subList(3, 5));
     assigner.onDiscoveredSplits(splits.subList(0, 1));
@@ -76,7 +72,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestB
     Assert.assertNotNull(comparator);
   }
 
-  protected void assertGetNext(SplitAssigner assigner, Long 
expectedSequenceNumber) {
+  private void assertGetNext(SplitAssigner assigner, Long 
expectedSequenceNumber) {
     GetSplitResult result = assigner.getNext(null);
     ContentFile file = result.split().task().files().iterator().next().file();
     Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
new file mode 100644
index 0000000000..e1fc63fda9
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
+  public static final Schema SCHEMA =
+      new Schema(required(1, "timestamp_column", 
Types.TimestampType.withoutZone()));
+  private static final GenericAppenderFactory APPENDER_FACTORY = new 
GenericAppenderFactory(SCHEMA);
+
+  @Override
+  protected SplitAssigner splitAssigner() {
+    return new OrderedSplitAssignerFactory(
+            SplitComparators.watermark(
+                new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column", 
null)))
+        .createAssigner();
+  }
+
+  /** Test the assigner when multiple files are in a single split */
+  @Test
+  public void testMultipleFilesInAnIcebergSplit() {
+    SplitAssigner assigner = splitAssigner();
+    assigner.onDiscoveredSplits(createSplits(4, 2, "2"));
+
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+  }
+
+  /** Test sorted splits */
+  @Test
+  public void testSplitSort() {
+    SplitAssigner assigner = splitAssigner();
+
+    Instant now = Instant.now();
+    List<IcebergSourceSplit> splits =
+        IntStream.range(0, 5)
+            .mapToObj(i -> splitFromInstant(now.plus(i, ChronoUnit.MINUTES)))
+            .collect(Collectors.toList());
+
+    assigner.onDiscoveredSplits(splits.subList(3, 5));
+    assigner.onDiscoveredSplits(splits.subList(0, 1));
+    assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+    assertGetNext(assigner, splits.get(0));
+    assertGetNext(assigner, splits.get(1));
+    assertGetNext(assigner, splits.get(2));
+    assertGetNext(assigner, splits.get(3));
+    assertGetNext(assigner, splits.get(4));
+
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+  }
+
+  @Test
+  public void testSerializable() {
+    byte[] bytes =
+        SerializationUtil.serializeToBytes(
+            SplitComparators.watermark(
+                new ColumnStatsWatermarkExtractor(
+                    TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS)));
+    SerializableComparator<IcebergSourceSplit> comparator =
+        SerializationUtil.deserializeFromBytes(bytes);
+    Assert.assertNotNull(comparator);
+  }
+
+  private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split) 
{
+    GetSplitResult result = assigner.getNext(null);
+    Assert.assertEquals(result.split(), split);
+  }
+
+  @Override
+  protected List<IcebergSourceSplit> createSplits(
+      int fileCount, int filesPerSplit, String version) {
+    return IntStream.range(0, fileCount / filesPerSplit)
+        .mapToObj(
+            splitNum ->
+                splitFromRecords(
+                    IntStream.range(0, filesPerSplit)
+                        .mapToObj(
+                            fileNum ->
+                                RandomGenericData.generate(
+                                    SCHEMA, 2, splitNum * filesPerSplit + 
fileNum))
+                        .collect(Collectors.toList())))
+        .collect(Collectors.toList());
+  }
+
+  private IcebergSourceSplit splitFromInstant(Instant instant) {
+    Record record = GenericRecord.create(SCHEMA);
+    record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
+    return splitFromRecords(ImmutableList.of(ImmutableList.of(record)));
+  }
+
+  private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
+    try {
+      return IcebergSourceSplit.fromCombinedScanTask(
+          ReaderUtil.createCombinedScanTask(
+              records, TEMPORARY_FOLDER, FileFormat.PARQUET, 
APPENDER_FACTORY));
+    } catch (IOException e) {
+      throw new RuntimeException("Split creation exception", e);
+    }
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f9ceaf8422..2a2503ef24 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -60,9 +60,12 @@ public class ReaderUtil {
       FileFormat fileFormat,
       FileAppenderFactory<Record> appenderFactory)
       throws IOException {
-    try (FileAppender<Record> appender =
-        appenderFactory.newAppender(Files.localOutput(file), fileFormat)) {
+    FileAppender<Record> appender =
+        appenderFactory.newAppender(Files.localOutput(file), fileFormat);
+    try {
       appender.addAll(records);
+    } finally {
+      appender.close();
     }
 
     DataFile dataFile =
@@ -71,6 +74,7 @@ public class ReaderUtil {
             .withFileSizeInBytes(file.length())
             .withPath(file.toString())
             .withFormat(fileFormat)
+            .withMetrics(appender.metrics())
             .build();
 
     ResidualEvaluator residuals = 
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..afe8a5d015
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestColumnStatsWatermarkExtractor {
+  public static final Schema SCHEMA =
+      new Schema(
+          required(1, "timestamp_column", Types.TimestampType.withoutZone()),
+          required(2, "timestamptz_column", Types.TimestampType.withZone()),
+          required(3, "long_column", Types.LongType.get()),
+          required(4, "string_column", Types.StringType.get()));
+
+  private static final GenericAppenderFactory APPENDER_FACTORY = new 
GenericAppenderFactory(SCHEMA);
+
+  private static final List<List<Record>> TEST_RECORDS =
+      ImmutableList.of(
+          RandomGenericData.generate(SCHEMA, 3, 2L), 
RandomGenericData.generate(SCHEMA, 3, 19L));
+
+  private static final List<Map<String, Long>> MIN_VALUES =
+      ImmutableList.of(Maps.newHashMapWithExpectedSize(3), 
Maps.newHashMapWithExpectedSize(3));
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource sourceTableResource =
+      new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, 
TestFixtures.TABLE, SCHEMA);
+
+  private final String columnName;
+
+  @BeforeClass
+  public static void updateMinValue() {
+    for (int i = 0; i < TEST_RECORDS.size(); ++i) {
+      for (Record r : TEST_RECORDS.get(i)) {
+        Map<String, Long> minValues = MIN_VALUES.get(i);
+
+        LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+        minValues.merge(
+            "timestamp_column", 
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
+
+        OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+        minValues.merge("timestamptz_column", 
offsetDateTime.toInstant().toEpochMilli(), Math::min);
+
+        minValues.merge("long_column", (Long) r.get(2), Math::min);
+      }
+    }
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> data() {
+    return ImmutableList.of(
+        new Object[] {"timestamp_column"},
+        new Object[] {"timestamptz_column"},
+        new Object[] {"long_column"});
+  }
+
+  public TestColumnStatsWatermarkExtractor(String columnName) {
+    this.columnName = columnName;
+  }
+
+  @Test
+  public void testSingle() throws IOException {
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MILLISECONDS);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
+  }
+
+  @Test
+  public void testTimeUnit() throws IOException {
+    Assume.assumeTrue("Run only for long column", 
columnName.equals("long_column"));
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, 
TimeUnit.MICROSECONDS);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue() / 1000L,
+        extractor.extractWatermark(split(0)));
+  }
+
+  @Test
+  public void testMultipleFiles() throws IOException {
+    Assume.assumeTrue("Run only for the timestamp column", 
columnName.equals("timestamp_column"));
+    IcebergSourceSplit combinedSplit =
+        IcebergSourceSplit.fromCombinedScanTask(
+            ReaderUtil.createCombinedScanTask(
+                TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET, 
APPENDER_FACTORY));
+
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
+
+    Assert.assertEquals(
+        MIN_VALUES.get(0).get(columnName).longValue(), 
extractor.extractWatermark(split(0)));
+    Assert.assertEquals(
+        MIN_VALUES.get(1).get(columnName).longValue(), 
extractor.extractWatermark(split(1)));
+    Assert.assertEquals(
+        Math.min(MIN_VALUES.get(0).get(columnName), 
MIN_VALUES.get(1).get(columnName)),
+        extractor.extractWatermark(combinedSplit));
+  }
+
+  @Test
+  public void testWrongColumn() {
+    Assume.assumeTrue("Run only for string column", 
columnName.equals("string_column"));
+    Assertions.assertThatThrownBy(() -> new 
ColumnStatsWatermarkExtractor(SCHEMA, columnName, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Found STRING, expected a LONG or TIMESTAMP column for watermark 
generation.");
+  }
+
+  @Test
+  public void testEmptyStatistics() throws IOException {
+    Assume.assumeTrue("Run only for timestamp column", 
columnName.equals("timestamp_column"));
+
+    // Create an extractor for a column we do not have statistics
+    ColumnStatsWatermarkExtractor extractor =
+        new ColumnStatsWatermarkExtractor(10, "missing_field");
+    Assertions.assertThatThrownBy(() -> extractor.extractWatermark(split(0)))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Missing statistics for column");
+  }
+
+  private IcebergSourceSplit split(int id) throws IOException {
+    return IcebergSourceSplit.fromCombinedScanTask(
+        ReaderUtil.createCombinedScanTask(
+            ImmutableList.of(TEST_RECORDS.get(id)),
+            TEMPORARY_FOLDER,
+            FileFormat.PARQUET,
+            APPENDER_FACTORY));
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index def4f43685..88234c6112 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -167,7 +167,12 @@ public class TestIcebergSourceReader {
             new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
             new PlaintextEncryptionManager(),
             Collections.emptyList());
-    return new IcebergSourceReader<>(readerMetrics, readerFunction, 
splitComparator, readerContext);
+    return new IcebergSourceReader<>(
+        SerializableRecordEmitter.defaultEmitter(),
+        readerMetrics,
+        readerFunction,
+        splitComparator,
+        readerContext);
   }
 
   private static class IdBasedComparator implements 
SerializableComparator<IcebergSourceSplit> {


Reply via email to