This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 006591c876 Flink: Backport to 1.15 and 1.16: Add possibilit of 
ordering the splits based on the file sequence number (#7661) (#7889)
006591c876 is described below

commit 006591c876f6dfb8a0877f5c55d540359f84fcf2
Author: pvary <[email protected]>
AuthorDate: Mon Jun 26 09:48:59 2023 +0200

    Flink: Backport to 1.15 and 1.16: Add possibilit of ordering the splits 
based on the file sequence number (#7661) (#7889)
---
 .../apache/iceberg/flink/source/IcebergSource.java | 25 ++++++-
 .../source/assigner/DefaultSplitAssigner.java}     | 20 +++---
 .../assigner/OrderedSplitAssignerFactory.java}     | 22 ++++--
 .../assigner/SimpleSplitAssignerFactory.java       |  9 +--
 .../flink/source/reader/IcebergSourceReader.java   |  4 +-
 .../source/reader/IcebergSourceSplitReader.java    | 17 ++++-
 .../source/split/SerializableComparator.java}      | 20 ++----
 .../flink/source/split/SplitComparators.java       | 59 +++++++++++++++
 .../apache/iceberg/flink/source/SplitHelpers.java  | 45 ++++++++++--
 ...litAssigner.java => SplitAssignerTestBase.java} | 20 +++---
 .../source/assigner/TestDefaultSplitAssigner.java} | 27 ++++---
 .../TestFileSequenceNumberBasedSplitAssigner.java  | 84 ++++++++++++++++++++++
 .../TestContinuousIcebergEnumerator.java           |  4 +-
 .../source/reader/TestIcebergSourceReader.java     | 71 +++++++++++++++++-
 .../apache/iceberg/flink/source/IcebergSource.java | 25 ++++++-
 .../source/assigner/DefaultSplitAssigner.java}     | 20 +++---
 ...ctory.java => OrderedSplitAssignerFactory.java} | 22 ++++--
 .../assigner/SimpleSplitAssignerFactory.java       |  9 +--
 .../flink/source/reader/IcebergSourceReader.java   |  4 +-
 .../source/reader/IcebergSourceSplitReader.java    | 17 ++++-
 .../SerializableComparator.java}                   | 20 ++----
 .../flink/source/split/SplitComparators.java       | 59 +++++++++++++++
 .../apache/iceberg/flink/source/SplitHelpers.java  | 45 ++++++++++--
 ...litAssigner.java => SplitAssignerTestBase.java} | 20 +++---
 .../source/assigner/TestDefaultSplitAssigner.java} | 27 ++++---
 .../TestFileSequenceNumberBasedSplitAssigner.java  | 84 ++++++++++++++++++++++
 .../TestContinuousIcebergEnumerator.java           |  4 +-
 .../source/reader/TestIcebergSourceReader.java     | 71 +++++++++++++++++-
 28 files changed, 722 insertions(+), 132 deletions(-)

diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index cbdd184870..d3859452a2 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -48,6 +48,8 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
 import org.apache.iceberg.flink.source.assigner.SplitAssigner;
 import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
 import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
@@ -63,6 +65,7 @@ import org.apache.iceberg.flink.source.reader.ReaderFunction;
 import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
@@ -76,6 +79,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   private final ScanContext scanContext;
   private final ReaderFunction<T> readerFunction;
   private final SplitAssignerFactory assignerFactory;
+  private final SerializableComparator<IcebergSourceSplit> splitComparator;
 
   // Can't use SerializableTable as enumerator needs a regular table
   // that can discover table changes
@@ -86,11 +90,13 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       ScanContext scanContext,
       ReaderFunction<T> readerFunction,
       SplitAssignerFactory assignerFactory,
+      SerializableComparator<IcebergSourceSplit> splitComparator,
       Table table) {
     this.tableLoader = tableLoader;
     this.scanContext = scanContext;
     this.readerFunction = readerFunction;
     this.assignerFactory = assignerFactory;
+    this.splitComparator = splitComparator;
     this.table = table;
   }
 
@@ -146,7 +152,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext 
readerContext) {
     IcebergSourceReaderMetrics metrics =
         new IcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
-    return new IcebergSourceReader<>(metrics, readerFunction, readerContext);
+    return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, 
readerContext);
   }
 
   @Override
@@ -209,6 +215,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private TableLoader tableLoader;
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
+    private SerializableComparator<IcebergSourceSplit> splitComparator;
     private ReaderFunction<T> readerFunction;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -234,6 +241,12 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    public Builder<T> splitComparator(
+        SerializableComparator<IcebergSourceSplit> newSplitComparator) {
+      this.splitComparator = newSplitComparator;
+      return this;
+    }
+
     public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
       this.readerFunction = newReaderFunction;
       return this;
@@ -462,10 +475,18 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         }
       }
 
+      if (splitAssignerFactory == null) {
+        if (splitComparator == null) {
+          splitAssignerFactory = new SimpleSplitAssignerFactory();
+        } else {
+          splitAssignerFactory = new 
OrderedSplitAssignerFactory(splitComparator);
+        }
+      }
+
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid 
double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader, context, readerFunction, splitAssignerFactory, 
splitComparator, table);
     }
 
     private void checkRequired() {
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
similarity index 82%
rename from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
rename to 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index b43660f907..37a0f1a605 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -20,7 +20,8 @@ package org.apache.iceberg.flink.source.assigner;
 
 import java.util.ArrayDeque;
 import java.util.Collection;
-import java.util.Deque;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -28,24 +29,27 @@ import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 
 /**
  * Since all methods are called in the source coordinator thread by 
enumerator, there is no need for
  * locking.
  */
 @Internal
-public class SimpleSplitAssigner implements SplitAssigner {
+public class DefaultSplitAssigner implements SplitAssigner {
 
-  private final Deque<IcebergSourceSplit> pendingSplits;
+  private final Queue<IcebergSourceSplit> pendingSplits;
   private CompletableFuture<Void> availableFuture;
 
-  public SimpleSplitAssigner() {
-    this.pendingSplits = new ArrayDeque<>();
+  public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit> 
comparator) {
+    this.pendingSplits = comparator == null ? new ArrayDeque<>() : new 
PriorityQueue<>(comparator);
   }
 
-  public SimpleSplitAssigner(Collection<IcebergSourceSplitState> 
assignerState) {
-    this.pendingSplits = new ArrayDeque<>(assignerState.size());
-    // Because simple assigner only tracks unassigned splits,
+  public DefaultSplitAssigner(
+      SerializableComparator<IcebergSourceSplit> comparator,
+      Collection<IcebergSourceSplitState> assignerState) {
+    this(comparator);
+    // Because default assigner only tracks unassigned splits,
     // there is no need to filter splits based on status (unassigned) here.
     assignerState.forEach(splitState -> pendingSplits.add(splitState.split()));
   }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
similarity index 54%
copy from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
index 1c14f4fcf9..e58478897a 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
@@ -19,18 +19,28 @@
 package org.apache.iceberg.flink.source.assigner;
 
 import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 
-/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+/**
+ * Create default assigner with a comparator that hands out splits where the 
order of the splits
+ * will be defined by the {@link SerializableComparator}.
+ */
+public class OrderedSplitAssignerFactory implements SplitAssignerFactory {
+  private final SerializableComparator<IcebergSourceSplit> comparator;
+
+  public 
OrderedSplitAssignerFactory(SerializableComparator<IcebergSourceSplit> 
comparator) {
+    this.comparator = comparator;
+  }
 
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  public SplitAssigner createAssigner() {
+    return new DefaultSplitAssigner(comparator);
   }
 
   @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
+  public SplitAssigner createAssigner(Collection<IcebergSourceSplitState> 
assignerState) {
+    return new DefaultSplitAssigner(comparator, assignerState);
   }
 }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
index 1c14f4fcf9..a2e2ff364d 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
@@ -23,14 +23,15 @@ import 
org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
 
 /** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
 public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+  public SimpleSplitAssignerFactory() {}
 
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  public SplitAssigner createAssigner() {
+    return new DefaultSplitAssigner(null);
   }
 
   @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
+  public SplitAssigner createAssigner(Collection<IcebergSourceSplitState> 
assignerState) {
+    return new DefaultSplitAssigner(null, assignerState);
   }
 }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 84c98055ce..8d7d68f961 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.flink.source.split.SplitRequestEvent;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
@@ -36,9 +37,10 @@ public class IcebergSourceReader<T>
   public IcebergSourceReader(
       IcebergSourceReaderMetrics metrics,
       ReaderFunction<T> readerFunction,
+      SerializableComparator<IcebergSourceSplit> splitComparator,
       SourceReaderContext context) {
     super(
-        () -> new IcebergSourceSplitReader<>(metrics, readerFunction, context),
+        () -> new IcebergSourceSplitReader<>(metrics, readerFunction, 
splitComparator, context),
         new IcebergSourceRecordEmitter<>(),
         context.getConfiguration(),
         context);
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index bb016671e6..4e270dfa3d 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsBySplits;
@@ -31,7 +32,9 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +43,7 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
 
   private final IcebergSourceReaderMetrics metrics;
   private final ReaderFunction<T> openSplitFunction;
+  private final SerializableComparator<IcebergSourceSplit> splitComparator;
   private final int indexOfSubtask;
   private final Queue<IcebergSourceSplit> splits;
 
@@ -50,9 +54,11 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
   IcebergSourceSplitReader(
       IcebergSourceReaderMetrics metrics,
       ReaderFunction<T> openSplitFunction,
+      SerializableComparator<IcebergSourceSplit> splitComparator,
       SourceReaderContext context) {
     this.metrics = metrics;
     this.openSplitFunction = openSplitFunction;
+    this.splitComparator = splitComparator;
     this.indexOfSubtask = context.getIndexOfSubtask();
     this.splits = new ArrayDeque<>();
   }
@@ -93,8 +99,15 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
           String.format("Unsupported split change: %s", 
splitsChange.getClass()));
     }
 
-    LOG.info("Add {} splits to reader", splitsChange.splits().size());
-    splits.addAll(splitsChange.splits());
+    if (splitComparator != null) {
+      List<IcebergSourceSplit> newSplits = 
Lists.newArrayList(splitsChange.splits());
+      newSplits.sort(splitComparator);
+      LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits);
+      splits.addAll(newSplits);
+    } else {
+      LOG.info("Add {} splits to reader", splitsChange.splits().size());
+      splits.addAll(splitsChange.splits());
+    }
     metrics.incrementAssignedSplits(splitsChange.splits().size());
     metrics.incrementAssignedBytes(calculateBytes(splitsChange));
   }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
similarity index 57%
copy from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
index 1c14f4fcf9..319648ca27 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
@@ -16,21 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.flink.source.assigner;
+package org.apache.iceberg.flink.source.split;
 
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import java.io.Serializable;
+import java.util.Comparator;
 
-/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
-
-  @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
-  }
-
-  @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
-  }
-}
+public interface SerializableComparator<T> extends Comparator<T>, Serializable 
{}
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
new file mode 100644
index 0000000000..64e03d77de
--- /dev/null
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Provides implementations of {@link 
org.apache.iceberg.flink.source.split.SerializableComparator}
+ * which could be used for ordering splits. These are used by the {@link
+ * org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory} and 
the {@link
+ * org.apache.iceberg.flink.source.reader.IcebergSourceReader}
+ */
+public class SplitComparators {
+  private SplitComparators() {}
+
+  /** Comparator which orders the splits based on the file sequence number of 
the data files */
+  public static SerializableComparator<IcebergSourceSplit> 
fileSequenceNumber() {
+    return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+      Preconditions.checkArgument(
+          o1.task().files().size() == 1 && o2.task().files().size() == 1,
+          "Could not compare combined task. Please use 'split-open-file-cost' 
to prevent combining multiple files to a split");
+
+      Long seq1 = 
o1.task().files().iterator().next().file().fileSequenceNumber();
+      Long seq2 = 
o2.task().files().iterator().next().file().fileSequenceNumber();
+
+      Preconditions.checkNotNull(
+          seq1,
+          "Invalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
+          o1);
+      Preconditions.checkNotNull(
+          seq2,
+          "IInvalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
+          o2);
+
+      int temp = Long.compare(seq1, seq2);
+      if (temp != 0) {
+        return temp;
+      } else {
+        return o1.splitId().compareTo(o2.splitId());
+      }
+    };
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
index 8dc68aad10..3a8071523b 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -20,19 +20,21 @@ package org.apache.iceberg.flink.source;
 
 import java.io.File;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.ThreadPools;
 import org.junit.Assert;
@@ -40,8 +42,6 @@ import org.junit.rules.TemporaryFolder;
 
 public class SplitHelpers {
 
-  private static final AtomicLong splitLengthIncrement = new AtomicLong();
-
   private SplitHelpers() {}
 
   /**
@@ -54,16 +54,53 @@ public class SplitHelpers {
    *
    *     <p>Since the table and data files are deleted before this method 
return, caller shouldn't
    *     attempt to read the data files.
+   *
+   *     <p>By default, v1 Iceberg table is created. For v2 table use {@link
+   *     SplitHelpers#createSplitsFromTransientHadoopTable(TemporaryFolder, 
int, int, String)}
+   *
+   * @param temporaryFolder Folder to place the data to
+   * @param fileCount The number of files to create and add to the table
+   * @param filesPerSplit The number of files used for a split
    */
   public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
       TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) 
throws Exception {
+    return createSplitsFromTransientHadoopTable(temporaryFolder, fileCount, 
filesPerSplit, "1");
+  }
+
+  /**
+   * This create a list of IcebergSourceSplit from real files
+   * <li>Create a new Hadoop table under the {@code temporaryFolder}
+   * <li>write {@code fileCount} number of files to the new Iceberg table
+   * <li>Discover the splits from the table and partition the splits by the 
{@code filePerSplit}
+   *     limit
+   * <li>Delete the Hadoop table
+   *
+   *     <p>Since the table and data files are deleted before this method 
return, caller shouldn't
+   *     attempt to read the data files.
+   *
+   * @param temporaryFolder Folder to place the data to
+   * @param fileCount The number of files to create and add to the table
+   * @param filesPerSplit The number of files used for a split
+   * @param version The table version to create
+   */
+  public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+      TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit, 
String version)
+      throws Exception {
     final File warehouseFile = temporaryFolder.newFolder();
     Assert.assertTrue(warehouseFile.delete());
     final String warehouse = "file:" + warehouseFile;
     Configuration hadoopConf = new Configuration();
     final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+    ImmutableMap<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, version);
     try {
-      final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+      final Table table =
+          catalog.createTable(
+              TestFixtures.TABLE_IDENTIFIER,
+              TestFixtures.SCHEMA,
+              PartitionSpec.unpartitioned(),
+              null,
+              properties);
       final GenericAppenderHelper dataAppender =
           new GenericAppenderHelper(table, FileFormat.PARQUET, 
temporaryFolder);
       for (int i = 0; i < fileCount; ++i) {
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
similarity index 88%
rename from 
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
rename to 
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index ee6c5cc3a6..f28677ca9d 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -30,22 +30,24 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestSimpleSplitAssigner {
+public abstract class SplitAssignerTestBase {
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
   @Test
   public void testEmptyInitialization() {
-    SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+    SplitAssigner assigner = splitAssigner();
     assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
   }
 
   /** Test a sequence of interactions for StaticEnumerator */
   @Test
   public void testStaticEnumeratorSequence() throws Exception {
-    SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+    SplitAssigner assigner = splitAssigner();
     assigner.onDiscoveredSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
2));
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
1));
 
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertSnapshot(assigner, 1);
     assigner.onUnassignedSplits(
@@ -61,7 +63,7 @@ public class TestSimpleSplitAssigner {
   /** Test a sequence of interactions for ContinuousEnumerator */
   @Test
   public void testContinuousEnumeratorSequence() throws Exception {
-    SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+    SplitAssigner assigner = splitAssigner();
     assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
 
     List<IcebergSourceSplit> splits1 =
@@ -81,7 +83,7 @@ public class TestSimpleSplitAssigner {
   }
 
   private void assertAvailableFuture(
-      SimpleSplitAssigner assigner, int splitCount, Runnable 
addSplitsRunnable) {
+      SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) {
     // register callback
     AtomicBoolean futureCompleted = new AtomicBoolean();
     CompletableFuture<Void> future = assigner.isAvailable();
@@ -102,7 +104,7 @@ public class TestSimpleSplitAssigner {
     assertSnapshot(assigner, 0);
   }
 
-  private void assertGetNext(SimpleSplitAssigner assigner, 
GetSplitResult.Status expectedStatus) {
+  protected void assertGetNext(SplitAssigner assigner, GetSplitResult.Status 
expectedStatus) {
     GetSplitResult result = assigner.getNext(null);
     Assert.assertEquals(expectedStatus, result.status());
     switch (expectedStatus) {
@@ -118,8 +120,10 @@ public class TestSimpleSplitAssigner {
     }
   }
 
-  private void assertSnapshot(SimpleSplitAssigner assigner, int splitCount) {
+  protected void assertSnapshot(SplitAssigner assigner, int splitCount) {
     Collection<IcebergSourceSplitState> stateBeforeGet = assigner.state();
     Assert.assertEquals(splitCount, stateBeforeGet.size());
   }
+
+  protected abstract SplitAssigner splitAssigner();
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
similarity index 52%
copy from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to 
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
index 1c14f4fcf9..8994f3054a 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
@@ -18,19 +18,26 @@
  */
 package org.apache.iceberg.flink.source.assigner;
 
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
-
-/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.junit.Test;
 
+public class TestDefaultSplitAssigner extends SplitAssignerTestBase {
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  protected SplitAssigner splitAssigner() {
+    return new DefaultSplitAssigner(null);
   }
 
-  @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
+  /** Test the assigner when multiple files are in a single split */
+  @Test
+  public void testMultipleFilesInASplit() throws Exception {
+    SplitAssigner assigner = splitAssigner();
+    assigner.onDiscoveredSplits(
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
2));
+
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertSnapshot(assigner, 1);
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+    assertSnapshot(assigner, 0);
   }
 }
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
new file mode 100644
index 0000000000..8b9e132e0e
--- /dev/null
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.util.SerializationUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestBase {
+  @Override
+  protected SplitAssigner splitAssigner() {
+    return new 
OrderedSplitAssignerFactory(SplitComparators.fileSequenceNumber()).createAssigner();
+  }
+
+  /** Test the assigner when multiple files are in a single split */
+  @Test
+  public void testMultipleFilesInAnIcebergSplit() {
+    SplitAssigner assigner = splitAssigner();
+    Assertions.assertThatThrownBy(
+            () ->
+                assigner.onDiscoveredSplits(
+                    
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+            "Multiple files in a split is not allowed")
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Please use 'split-open-file-cost'");
+  }
+
+  /** Test sorted splits */
+  @Test
+  public void testSplitSort() throws Exception {
+    SplitAssigner assigner = splitAssigner();
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5, 
1, "2");
+
+    assigner.onDiscoveredSplits(splits.subList(3, 5));
+    assigner.onDiscoveredSplits(splits.subList(0, 1));
+    assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+    assertGetNext(assigner, 1L);
+    assertGetNext(assigner, 2L);
+    assertGetNext(assigner, 3L);
+    assertGetNext(assigner, 4L);
+    assertGetNext(assigner, 5L);
+
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+  }
+
+  @Test
+  public void testSerializable() {
+    byte[] bytes = 
SerializationUtil.serializeToBytes(SplitComparators.fileSequenceNumber());
+    SerializableComparator<IcebergSourceSplit> comparator =
+        SerializationUtil.deserializeFromBytes(bytes);
+    Assert.assertNotNull(comparator);
+  }
+
+  protected void assertGetNext(SplitAssigner assigner, Long 
expectedSequenceNumber) {
+    GetSplitResult result = assigner.getNext(null);
+    ContentFile file = result.split().task().files().iterator().next().file();
+    Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index d0ae8fdf77..349eb11cf5 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumerator
 import org.apache.iceberg.flink.source.ScanContext;
 import org.apache.iceberg.flink.source.SplitHelpers;
 import org.apache.iceberg.flink.source.StreamingStartingStrategy;
-import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
@@ -342,7 +342,7 @@ public class TestContinuousIcebergEnumerator {
     ContinuousIcebergEnumerator enumerator =
         new ContinuousIcebergEnumerator(
             context,
-            new SimpleSplitAssigner(Collections.emptyList()),
+            new DefaultSplitAssigner(null, Collections.emptyList()),
             scanContext,
             splitPlanner,
             null);
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index 56af0caf12..def4f43685 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -35,6 +35,7 @@ import 
org.apache.iceberg.encryption.PlaintextEncryptionManager;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -55,13 +56,68 @@ public class TestIcebergSourceReader {
     TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
     TestingMetricGroup metricGroup = new TestingMetricGroup();
     TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
-    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, 
null);
     reader.start();
 
     testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
     testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
   }
 
+  @Test
+  public void testReaderOrder() throws Exception {
+    // Create 2 splits
+    List<List<Record>> recordBatchList1 =
+        ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task1 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    List<List<Record>> recordBatchList2 =
+        ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task2 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    // Sort the splits in one way
+    List<RowData> rowDataList1 =
+        read(
+            Arrays.asList(
+                IcebergSourceSplit.fromCombinedScanTask(task1),
+                IcebergSourceSplit.fromCombinedScanTask(task2)),
+            2);
+
+    // Reverse the splits
+    List<RowData> rowDataList2 =
+        read(
+            Arrays.asList(
+                IcebergSourceSplit.fromCombinedScanTask(task2),
+                IcebergSourceSplit.fromCombinedScanTask(task1)),
+            2);
+
+    // Check that the order of the elements is not changed
+    Assert.assertEquals(rowDataList1.get(0), rowDataList2.get(0));
+    Assert.assertEquals(rowDataList1.get(1), rowDataList2.get(1));
+  }
+
+  private List<RowData> read(List<IcebergSourceSplit> splits, long expected) 
throws Exception {
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
+    // Using IdBasedComparator, so we can have a deterministic order of the 
splits
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, new 
IdBasedComparator());
+    reader.start();
+
+    reader.addSplits(splits);
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    while (readerOutput.getEmittedRecords().size() < expected) {
+      reader.pollNext(readerOutput);
+    }
+
+    reader.pollNext(readerOutput);
+
+    Assert.assertEquals(expected, readerOutput.getEmittedRecords().size());
+    return readerOutput.getEmittedRecords();
+  }
+
   private void testOneSplitFetcher(
       IcebergSourceReader reader,
       TestingReaderOutput<RowData> readerOutput,
@@ -96,7 +152,9 @@ public class TestIcebergSourceReader {
   }
 
   private IcebergSourceReader createReader(
-      MetricGroup metricGroup, SourceReaderContext readerContext) {
+      MetricGroup metricGroup,
+      SourceReaderContext readerContext,
+      SerializableComparator<IcebergSourceSplit> splitComparator) {
     IcebergSourceReaderMetrics readerMetrics =
         new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
     RowDataReaderFunction readerFunction =
@@ -109,6 +167,13 @@ public class TestIcebergSourceReader {
             new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
             new PlaintextEncryptionManager(),
             Collections.emptyList());
-    return new IcebergSourceReader<>(readerMetrics, readerFunction, 
readerContext);
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, 
splitComparator, readerContext);
+  }
+
+  private static class IdBasedComparator implements 
SerializableComparator<IcebergSourceSplit> {
+    @Override
+    public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) {
+      return o1.splitId().compareTo(o2.splitId());
+    }
   }
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index cbdd184870..d3859452a2 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -48,6 +48,8 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkReadOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
 import org.apache.iceberg.flink.source.assigner.SplitAssigner;
 import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
 import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
@@ -63,6 +65,7 @@ import org.apache.iceberg.flink.source.reader.ReaderFunction;
 import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
@@ -76,6 +79,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   private final ScanContext scanContext;
   private final ReaderFunction<T> readerFunction;
   private final SplitAssignerFactory assignerFactory;
+  private final SerializableComparator<IcebergSourceSplit> splitComparator;
 
   // Can't use SerializableTable as enumerator needs a regular table
   // that can discover table changes
@@ -86,11 +90,13 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       ScanContext scanContext,
       ReaderFunction<T> readerFunction,
       SplitAssignerFactory assignerFactory,
+      SerializableComparator<IcebergSourceSplit> splitComparator,
       Table table) {
     this.tableLoader = tableLoader;
     this.scanContext = scanContext;
     this.readerFunction = readerFunction;
     this.assignerFactory = assignerFactory;
+    this.splitComparator = splitComparator;
     this.table = table;
   }
 
@@ -146,7 +152,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
   public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext 
readerContext) {
     IcebergSourceReaderMetrics metrics =
         new IcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
-    return new IcebergSourceReader<>(metrics, readerFunction, readerContext);
+    return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, 
readerContext);
   }
 
   @Override
@@ -209,6 +215,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private TableLoader tableLoader;
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
+    private SerializableComparator<IcebergSourceSplit> splitComparator;
     private ReaderFunction<T> readerFunction;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -234,6 +241,12 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    public Builder<T> splitComparator(
+        SerializableComparator<IcebergSourceSplit> newSplitComparator) {
+      this.splitComparator = newSplitComparator;
+      return this;
+    }
+
     public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
       this.readerFunction = newReaderFunction;
       return this;
@@ -462,10 +475,18 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         }
       }
 
+      if (splitAssignerFactory == null) {
+        if (splitComparator == null) {
+          splitAssignerFactory = new SimpleSplitAssignerFactory();
+        } else {
+          splitAssignerFactory = new 
OrderedSplitAssignerFactory(splitComparator);
+        }
+      }
+
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid 
double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader, context, readerFunction, splitAssignerFactory, 
splitComparator, table);
     }
 
     private void checkRequired() {
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
similarity index 82%
rename from 
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
rename to 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index b43660f907..37a0f1a605 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -20,7 +20,8 @@ package org.apache.iceberg.flink.source.assigner;
 
 import java.util.ArrayDeque;
 import java.util.Collection;
-import java.util.Deque;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -28,24 +29,27 @@ import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 
 /**
  * Since all methods are called in the source coordinator thread by 
enumerator, there is no need for
  * locking.
  */
 @Internal
-public class SimpleSplitAssigner implements SplitAssigner {
+public class DefaultSplitAssigner implements SplitAssigner {
 
-  private final Deque<IcebergSourceSplit> pendingSplits;
+  private final Queue<IcebergSourceSplit> pendingSplits;
   private CompletableFuture<Void> availableFuture;
 
-  public SimpleSplitAssigner() {
-    this.pendingSplits = new ArrayDeque<>();
+  public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit> 
comparator) {
+    this.pendingSplits = comparator == null ? new ArrayDeque<>() : new 
PriorityQueue<>(comparator);
   }
 
-  public SimpleSplitAssigner(Collection<IcebergSourceSplitState> 
assignerState) {
-    this.pendingSplits = new ArrayDeque<>(assignerState.size());
-    // Because simple assigner only tracks unassigned splits,
+  public DefaultSplitAssigner(
+      SerializableComparator<IcebergSourceSplit> comparator,
+      Collection<IcebergSourceSplitState> assignerState) {
+    this(comparator);
+    // Because default assigner only tracks unassigned splits,
     // there is no need to filter splits based on status (unassigned) here.
     assignerState.forEach(splitState -> pendingSplits.add(splitState.split()));
   }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
similarity index 54%
copy from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
index 1c14f4fcf9..e58478897a 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
@@ -19,18 +19,28 @@
 package org.apache.iceberg.flink.source.assigner;
 
 import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 
-/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+/**
+ * Create default assigner with a comparator that hands out splits where the 
order of the splits
+ * will be defined by the {@link SerializableComparator}.
+ */
+public class OrderedSplitAssignerFactory implements SplitAssignerFactory {
+  private final SerializableComparator<IcebergSourceSplit> comparator;
+
+  public 
OrderedSplitAssignerFactory(SerializableComparator<IcebergSourceSplit> 
comparator) {
+    this.comparator = comparator;
+  }
 
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  public SplitAssigner createAssigner() {
+    return new DefaultSplitAssigner(comparator);
   }
 
   @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
+  public SplitAssigner createAssigner(Collection<IcebergSourceSplitState> 
assignerState) {
+    return new DefaultSplitAssigner(comparator, assignerState);
   }
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
index 1c14f4fcf9..a2e2ff364d 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
@@ -23,14 +23,15 @@ import 
org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
 
 /** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
 public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+  public SimpleSplitAssignerFactory() {}
 
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  public SplitAssigner createAssigner() {
+    return new DefaultSplitAssigner(null);
   }
 
   @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
+  public SplitAssigner createAssigner(Collection<IcebergSourceSplitState> 
assignerState) {
+    return new DefaultSplitAssigner(null, assignerState);
   }
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 84c98055ce..8d7d68f961 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.flink.source.split.SplitRequestEvent;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
@@ -36,9 +37,10 @@ public class IcebergSourceReader<T>
   public IcebergSourceReader(
       IcebergSourceReaderMetrics metrics,
       ReaderFunction<T> readerFunction,
+      SerializableComparator<IcebergSourceSplit> splitComparator,
       SourceReaderContext context) {
     super(
-        () -> new IcebergSourceSplitReader<>(metrics, readerFunction, context),
+        () -> new IcebergSourceSplitReader<>(metrics, readerFunction, 
splitComparator, context),
         new IcebergSourceRecordEmitter<>(),
         context.getConfiguration(),
         context);
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index bb016671e6..4e270dfa3d 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsBySplits;
@@ -31,7 +32,9 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +43,7 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
 
   private final IcebergSourceReaderMetrics metrics;
   private final ReaderFunction<T> openSplitFunction;
+  private final SerializableComparator<IcebergSourceSplit> splitComparator;
   private final int indexOfSubtask;
   private final Queue<IcebergSourceSplit> splits;
 
@@ -50,9 +54,11 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
   IcebergSourceSplitReader(
       IcebergSourceReaderMetrics metrics,
       ReaderFunction<T> openSplitFunction,
+      SerializableComparator<IcebergSourceSplit> splitComparator,
       SourceReaderContext context) {
     this.metrics = metrics;
     this.openSplitFunction = openSplitFunction;
+    this.splitComparator = splitComparator;
     this.indexOfSubtask = context.getIndexOfSubtask();
     this.splits = new ArrayDeque<>();
   }
@@ -93,8 +99,15 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
           String.format("Unsupported split change: %s", 
splitsChange.getClass()));
     }
 
-    LOG.info("Add {} splits to reader", splitsChange.splits().size());
-    splits.addAll(splitsChange.splits());
+    if (splitComparator != null) {
+      List<IcebergSourceSplit> newSplits = 
Lists.newArrayList(splitsChange.splits());
+      newSplits.sort(splitComparator);
+      LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits);
+      splits.addAll(newSplits);
+    } else {
+      LOG.info("Add {} splits to reader", splitsChange.splits().size());
+      splits.addAll(splitsChange.splits());
+    }
     metrics.incrementAssignedSplits(splitsChange.splits().size());
     metrics.incrementAssignedBytes(calculateBytes(splitsChange));
   }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
similarity index 57%
copy from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
index 1c14f4fcf9..319648ca27 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
@@ -16,21 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.flink.source.assigner;
+package org.apache.iceberg.flink.source.split;
 
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import java.io.Serializable;
+import java.util.Comparator;
 
-/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
-
-  @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
-  }
-
-  @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
-  }
-}
+public interface SerializableComparator<T> extends Comparator<T>, Serializable 
{}
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
new file mode 100644
index 0000000000..64e03d77de
--- /dev/null
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Provides implementations of {@link 
org.apache.iceberg.flink.source.split.SerializableComparator}
+ * which could be used for ordering splits. These are used by the {@link
+ * org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory} and 
the {@link
+ * org.apache.iceberg.flink.source.reader.IcebergSourceReader}
+ */
+public class SplitComparators {
+  private SplitComparators() {}
+
+  /** Comparator which orders the splits based on the file sequence number of 
the data files */
+  public static SerializableComparator<IcebergSourceSplit> 
fileSequenceNumber() {
+    return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+      Preconditions.checkArgument(
+          o1.task().files().size() == 1 && o2.task().files().size() == 1,
+          "Could not compare combined task. Please use 'split-open-file-cost' 
to prevent combining multiple files to a split");
+
+      Long seq1 = 
o1.task().files().iterator().next().file().fileSequenceNumber();
+      Long seq2 = 
o2.task().files().iterator().next().file().fileSequenceNumber();
+
+      Preconditions.checkNotNull(
+          seq1,
+          "Invalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
+          o1);
+      Preconditions.checkNotNull(
+          seq2,
+          "IInvalid file sequence number: null. Doesn't support splits written 
with V1 format: %s",
+          o2);
+
+      int temp = Long.compare(seq1, seq2);
+      if (temp != 0) {
+        return temp;
+      } else {
+        return o1.splitId().compareTo(o2.splitId());
+      }
+    };
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
index 8dc68aad10..3a8071523b 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -20,19 +20,21 @@ package org.apache.iceberg.flink.source;
 
 import java.io.File;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.ThreadPools;
 import org.junit.Assert;
@@ -40,8 +42,6 @@ import org.junit.rules.TemporaryFolder;
 
 public class SplitHelpers {
 
-  private static final AtomicLong splitLengthIncrement = new AtomicLong();
-
   private SplitHelpers() {}
 
   /**
@@ -54,16 +54,53 @@ public class SplitHelpers {
    *
    *     <p>Since the table and data files are deleted before this method 
return, caller shouldn't
    *     attempt to read the data files.
+   *
+   *     <p>By default, v1 Iceberg table is created. For v2 table use {@link
+   *     SplitHelpers#createSplitsFromTransientHadoopTable(TemporaryFolder, 
int, int, String)}
+   *
+   * @param temporaryFolder Folder to place the data to
+   * @param fileCount The number of files to create and add to the table
+   * @param filesPerSplit The number of files used for a split
    */
   public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
       TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) 
throws Exception {
+    return createSplitsFromTransientHadoopTable(temporaryFolder, fileCount, 
filesPerSplit, "1");
+  }
+
+  /**
+   * This create a list of IcebergSourceSplit from real files
+   * <li>Create a new Hadoop table under the {@code temporaryFolder}
+   * <li>write {@code fileCount} number of files to the new Iceberg table
+   * <li>Discover the splits from the table and partition the splits by the 
{@code filePerSplit}
+   *     limit
+   * <li>Delete the Hadoop table
+   *
+   *     <p>Since the table and data files are deleted before this method 
return, caller shouldn't
+   *     attempt to read the data files.
+   *
+   * @param temporaryFolder Folder to place the data to
+   * @param fileCount The number of files to create and add to the table
+   * @param filesPerSplit The number of files used for a split
+   * @param version The table version to create
+   */
+  public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+      TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit, 
String version)
+      throws Exception {
     final File warehouseFile = temporaryFolder.newFolder();
     Assert.assertTrue(warehouseFile.delete());
     final String warehouse = "file:" + warehouseFile;
     Configuration hadoopConf = new Configuration();
     final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+    ImmutableMap<String, String> properties =
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, version);
     try {
-      final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
+      final Table table =
+          catalog.createTable(
+              TestFixtures.TABLE_IDENTIFIER,
+              TestFixtures.SCHEMA,
+              PartitionSpec.unpartitioned(),
+              null,
+              properties);
       final GenericAppenderHelper dataAppender =
           new GenericAppenderHelper(table, FileFormat.PARQUET, 
temporaryFolder);
       for (int i = 0; i < fileCount; ++i) {
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
similarity index 88%
rename from 
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
rename to 
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index ee6c5cc3a6..f28677ca9d 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -30,22 +30,24 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestSimpleSplitAssigner {
+public abstract class SplitAssignerTestBase {
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
   @Test
   public void testEmptyInitialization() {
-    SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+    SplitAssigner assigner = splitAssigner();
     assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
   }
 
   /** Test a sequence of interactions for StaticEnumerator */
   @Test
   public void testStaticEnumeratorSequence() throws Exception {
-    SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+    SplitAssigner assigner = splitAssigner();
     assigner.onDiscoveredSplits(
-        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
2));
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
1));
 
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
     assertSnapshot(assigner, 1);
     assigner.onUnassignedSplits(
@@ -61,7 +63,7 @@ public class TestSimpleSplitAssigner {
   /** Test a sequence of interactions for ContinuousEnumerator */
   @Test
   public void testContinuousEnumeratorSequence() throws Exception {
-    SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+    SplitAssigner assigner = splitAssigner();
     assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
 
     List<IcebergSourceSplit> splits1 =
@@ -81,7 +83,7 @@ public class TestSimpleSplitAssigner {
   }
 
   private void assertAvailableFuture(
-      SimpleSplitAssigner assigner, int splitCount, Runnable 
addSplitsRunnable) {
+      SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) {
     // register callback
     AtomicBoolean futureCompleted = new AtomicBoolean();
     CompletableFuture<Void> future = assigner.isAvailable();
@@ -102,7 +104,7 @@ public class TestSimpleSplitAssigner {
     assertSnapshot(assigner, 0);
   }
 
-  private void assertGetNext(SimpleSplitAssigner assigner, 
GetSplitResult.Status expectedStatus) {
+  protected void assertGetNext(SplitAssigner assigner, GetSplitResult.Status 
expectedStatus) {
     GetSplitResult result = assigner.getNext(null);
     Assert.assertEquals(expectedStatus, result.status());
     switch (expectedStatus) {
@@ -118,8 +120,10 @@ public class TestSimpleSplitAssigner {
     }
   }
 
-  private void assertSnapshot(SimpleSplitAssigner assigner, int splitCount) {
+  protected void assertSnapshot(SplitAssigner assigner, int splitCount) {
     Collection<IcebergSourceSplitState> stateBeforeGet = assigner.state();
     Assert.assertEquals(splitCount, stateBeforeGet.size());
   }
+
+  protected abstract SplitAssigner splitAssigner();
 }
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
similarity index 52%
copy from 
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to 
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
index 1c14f4fcf9..8994f3054a 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
@@ -18,19 +18,26 @@
  */
 package org.apache.iceberg.flink.source.assigner;
 
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
-
-/** Create simple assigner that hands out splits without any guarantee in 
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.junit.Test;
 
+public class TestDefaultSplitAssigner extends SplitAssignerTestBase {
   @Override
-  public SimpleSplitAssigner createAssigner() {
-    return new SimpleSplitAssigner();
+  protected SplitAssigner splitAssigner() {
+    return new DefaultSplitAssigner(null);
   }
 
-  @Override
-  public SimpleSplitAssigner 
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
-    return new SimpleSplitAssigner(assignerState);
+  /** Test the assigner when multiple files are in a single split */
+  @Test
+  public void testMultipleFilesInASplit() throws Exception {
+    SplitAssigner assigner = splitAssigner();
+    assigner.onDiscoveredSplits(
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 
2));
+
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertSnapshot(assigner, 1);
+    assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+    assertSnapshot(assigner, 0);
   }
 }
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
new file mode 100644
index 0000000000..8b9e132e0e
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.util.SerializationUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFileSequenceNumberBasedSplitAssigner extends 
SplitAssignerTestBase {
+  @Override
+  protected SplitAssigner splitAssigner() {
+    return new 
OrderedSplitAssignerFactory(SplitComparators.fileSequenceNumber()).createAssigner();
+  }
+
+  /** Test the assigner when multiple files are in a single split */
+  @Test
+  public void testMultipleFilesInAnIcebergSplit() {
+    SplitAssigner assigner = splitAssigner();
+    Assertions.assertThatThrownBy(
+            () ->
+                assigner.onDiscoveredSplits(
+                    
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+            "Multiple files in a split is not allowed")
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Please use 'split-open-file-cost'");
+  }
+
+  /** Test sorted splits */
+  @Test
+  public void testSplitSort() throws Exception {
+    SplitAssigner assigner = splitAssigner();
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5, 
1, "2");
+
+    assigner.onDiscoveredSplits(splits.subList(3, 5));
+    assigner.onDiscoveredSplits(splits.subList(0, 1));
+    assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+    assertGetNext(assigner, 1L);
+    assertGetNext(assigner, 2L);
+    assertGetNext(assigner, 3L);
+    assertGetNext(assigner, 4L);
+    assertGetNext(assigner, 5L);
+
+    assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+  }
+
+  @Test
+  public void testSerializable() {
+    byte[] bytes = 
SerializationUtil.serializeToBytes(SplitComparators.fileSequenceNumber());
+    SerializableComparator<IcebergSourceSplit> comparator =
+        SerializationUtil.deserializeFromBytes(bytes);
+    Assert.assertNotNull(comparator);
+  }
+
+  protected void assertGetNext(SplitAssigner assigner, Long 
expectedSequenceNumber) {
+    GetSplitResult result = assigner.getNext(null);
+    ContentFile file = result.split().task().files().iterator().next().file();
+    Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index d0ae8fdf77..349eb11cf5 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumerator
 import org.apache.iceberg.flink.source.ScanContext;
 import org.apache.iceberg.flink.source.SplitHelpers;
 import org.apache.iceberg.flink.source.StreamingStartingStrategy;
-import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
@@ -342,7 +342,7 @@ public class TestContinuousIcebergEnumerator {
     ContinuousIcebergEnumerator enumerator =
         new ContinuousIcebergEnumerator(
             context,
-            new SimpleSplitAssigner(Collections.emptyList()),
+            new DefaultSplitAssigner(null, Collections.emptyList()),
             scanContext,
             splitPlanner,
             null);
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index 56af0caf12..def4f43685 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -35,6 +35,7 @@ import 
org.apache.iceberg.encryption.PlaintextEncryptionManager;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -55,13 +56,68 @@ public class TestIcebergSourceReader {
     TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
     TestingMetricGroup metricGroup = new TestingMetricGroup();
     TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
-    IcebergSourceReader reader = createReader(metricGroup, readerContext);
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, 
null);
     reader.start();
 
     testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
     testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
   }
 
+  @Test
+  public void testReaderOrder() throws Exception {
+    // Create 2 splits
+    List<List<Record>> recordBatchList1 =
+        ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task1 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    List<List<Record>> recordBatchList2 =
+        ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
+    CombinedScanTask task2 =
+        ReaderUtil.createCombinedScanTask(
+            recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, 
appenderFactory);
+
+    // Sort the splits in one way
+    List<RowData> rowDataList1 =
+        read(
+            Arrays.asList(
+                IcebergSourceSplit.fromCombinedScanTask(task1),
+                IcebergSourceSplit.fromCombinedScanTask(task2)),
+            2);
+
+    // Reverse the splits
+    List<RowData> rowDataList2 =
+        read(
+            Arrays.asList(
+                IcebergSourceSplit.fromCombinedScanTask(task2),
+                IcebergSourceSplit.fromCombinedScanTask(task1)),
+            2);
+
+    // Check that the order of the elements is not changed
+    Assert.assertEquals(rowDataList1.get(0), rowDataList2.get(0));
+    Assert.assertEquals(rowDataList1.get(1), rowDataList2.get(1));
+  }
+
+  private List<RowData> read(List<IcebergSourceSplit> splits, long expected) 
throws Exception {
+    TestingMetricGroup metricGroup = new TestingMetricGroup();
+    TestingReaderContext readerContext = new TestingReaderContext(new 
Configuration(), metricGroup);
+    // Using IdBasedComparator, so we can have a deterministic order of the 
splits
+    IcebergSourceReader reader = createReader(metricGroup, readerContext, new 
IdBasedComparator());
+    reader.start();
+
+    reader.addSplits(splits);
+    TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+    while (readerOutput.getEmittedRecords().size() < expected) {
+      reader.pollNext(readerOutput);
+    }
+
+    reader.pollNext(readerOutput);
+
+    Assert.assertEquals(expected, readerOutput.getEmittedRecords().size());
+    return readerOutput.getEmittedRecords();
+  }
+
   private void testOneSplitFetcher(
       IcebergSourceReader reader,
       TestingReaderOutput<RowData> readerOutput,
@@ -96,7 +152,9 @@ public class TestIcebergSourceReader {
   }
 
   private IcebergSourceReader createReader(
-      MetricGroup metricGroup, SourceReaderContext readerContext) {
+      MetricGroup metricGroup,
+      SourceReaderContext readerContext,
+      SerializableComparator<IcebergSourceSplit> splitComparator) {
     IcebergSourceReaderMetrics readerMetrics =
         new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
     RowDataReaderFunction readerFunction =
@@ -109,6 +167,13 @@ public class TestIcebergSourceReader {
             new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
             new PlaintextEncryptionManager(),
             Collections.emptyList());
-    return new IcebergSourceReader<>(readerMetrics, readerFunction, 
readerContext);
+    return new IcebergSourceReader<>(readerMetrics, readerFunction, 
splitComparator, readerContext);
+  }
+
+  private static class IdBasedComparator implements 
SerializableComparator<IcebergSourceSplit> {
+    @Override
+    public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) {
+      return o1.splitId().compareTo(o2.splitId());
+    }
   }
 }

Reply via email to