This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 006591c876 Flink: Backport to 1.15 and 1.16: Add possibilit of
ordering the splits based on the file sequence number (#7661) (#7889)
006591c876 is described below
commit 006591c876f6dfb8a0877f5c55d540359f84fcf2
Author: pvary <[email protected]>
AuthorDate: Mon Jun 26 09:48:59 2023 +0200
Flink: Backport to 1.15 and 1.16: Add possibilit of ordering the splits
based on the file sequence number (#7661) (#7889)
---
.../apache/iceberg/flink/source/IcebergSource.java | 25 ++++++-
.../source/assigner/DefaultSplitAssigner.java} | 20 +++---
.../assigner/OrderedSplitAssignerFactory.java} | 22 ++++--
.../assigner/SimpleSplitAssignerFactory.java | 9 +--
.../flink/source/reader/IcebergSourceReader.java | 4 +-
.../source/reader/IcebergSourceSplitReader.java | 17 ++++-
.../source/split/SerializableComparator.java} | 20 ++----
.../flink/source/split/SplitComparators.java | 59 +++++++++++++++
.../apache/iceberg/flink/source/SplitHelpers.java | 45 ++++++++++--
...litAssigner.java => SplitAssignerTestBase.java} | 20 +++---
.../source/assigner/TestDefaultSplitAssigner.java} | 27 ++++---
.../TestFileSequenceNumberBasedSplitAssigner.java | 84 ++++++++++++++++++++++
.../TestContinuousIcebergEnumerator.java | 4 +-
.../source/reader/TestIcebergSourceReader.java | 71 +++++++++++++++++-
.../apache/iceberg/flink/source/IcebergSource.java | 25 ++++++-
.../source/assigner/DefaultSplitAssigner.java} | 20 +++---
...ctory.java => OrderedSplitAssignerFactory.java} | 22 ++++--
.../assigner/SimpleSplitAssignerFactory.java | 9 +--
.../flink/source/reader/IcebergSourceReader.java | 4 +-
.../source/reader/IcebergSourceSplitReader.java | 17 ++++-
.../SerializableComparator.java} | 20 ++----
.../flink/source/split/SplitComparators.java | 59 +++++++++++++++
.../apache/iceberg/flink/source/SplitHelpers.java | 45 ++++++++++--
...litAssigner.java => SplitAssignerTestBase.java} | 20 +++---
.../source/assigner/TestDefaultSplitAssigner.java} | 27 ++++---
.../TestFileSequenceNumberBasedSplitAssigner.java | 84 ++++++++++++++++++++++
.../TestContinuousIcebergEnumerator.java | 4 +-
.../source/reader/TestIcebergSourceReader.java | 71 +++++++++++++++++-
28 files changed, 722 insertions(+), 132 deletions(-)
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index cbdd184870..d3859452a2 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -48,6 +48,8 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
@@ -63,6 +65,7 @@ import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
@@ -76,6 +79,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
+ private final SerializableComparator<IcebergSourceSplit> splitComparator;
// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
@@ -86,11 +90,13 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
ScanContext scanContext,
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
+ SerializableComparator<IcebergSourceSplit> splitComparator,
Table table) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
+ this.splitComparator = splitComparator;
this.table = table;
}
@@ -146,7 +152,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext
readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
- return new IcebergSourceReader<>(metrics, readerFunction, readerContext);
+ return new IcebergSourceReader<>(metrics, readerFunction, splitComparator,
readerContext);
}
@Override
@@ -209,6 +215,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private TableLoader tableLoader;
private Table table;
private SplitAssignerFactory splitAssignerFactory;
+ private SerializableComparator<IcebergSourceSplit> splitComparator;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -234,6 +241,12 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return this;
}
+ public Builder<T> splitComparator(
+ SerializableComparator<IcebergSourceSplit> newSplitComparator) {
+ this.splitComparator = newSplitComparator;
+ return this;
+ }
+
public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
this.readerFunction = newReaderFunction;
return this;
@@ -462,10 +475,18 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
}
+ if (splitAssignerFactory == null) {
+ if (splitComparator == null) {
+ splitAssignerFactory = new SimpleSplitAssignerFactory();
+ } else {
+ splitAssignerFactory = new
OrderedSplitAssignerFactory(splitComparator);
+ }
+ }
+
checkRequired();
// Since builder already load the table, pass it to the source to avoid
double loading
return new IcebergSource<T>(
- tableLoader, context, readerFunction, splitAssignerFactory, table);
+ tableLoader, context, readerFunction, splitAssignerFactory,
splitComparator, table);
}
private void checkRequired() {
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
similarity index 82%
rename from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
rename to
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index b43660f907..37a0f1a605 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -20,7 +20,8 @@ package org.apache.iceberg.flink.source.assigner;
import java.util.ArrayDeque;
import java.util.Collection;
-import java.util.Deque;
+import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -28,24 +29,27 @@ import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
/**
* Since all methods are called in the source coordinator thread by
enumerator, there is no need for
* locking.
*/
@Internal
-public class SimpleSplitAssigner implements SplitAssigner {
+public class DefaultSplitAssigner implements SplitAssigner {
- private final Deque<IcebergSourceSplit> pendingSplits;
+ private final Queue<IcebergSourceSplit> pendingSplits;
private CompletableFuture<Void> availableFuture;
- public SimpleSplitAssigner() {
- this.pendingSplits = new ArrayDeque<>();
+ public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit>
comparator) {
+ this.pendingSplits = comparator == null ? new ArrayDeque<>() : new
PriorityQueue<>(comparator);
}
- public SimpleSplitAssigner(Collection<IcebergSourceSplitState>
assignerState) {
- this.pendingSplits = new ArrayDeque<>(assignerState.size());
- // Because simple assigner only tracks unassigned splits,
+ public DefaultSplitAssigner(
+ SerializableComparator<IcebergSourceSplit> comparator,
+ Collection<IcebergSourceSplitState> assignerState) {
+ this(comparator);
+ // Because default assigner only tracks unassigned splits,
// there is no need to filter splits based on status (unassigned) here.
assignerState.forEach(splitState -> pendingSplits.add(splitState.split()));
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
similarity index 54%
copy from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
index 1c14f4fcf9..e58478897a 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
@@ -19,18 +19,28 @@
package org.apache.iceberg.flink.source.assigner;
import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
-/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+/**
+ * Create default assigner with a comparator that hands out splits where the
order of the splits
+ * will be defined by the {@link SerializableComparator}.
+ */
+public class OrderedSplitAssignerFactory implements SplitAssignerFactory {
+ private final SerializableComparator<IcebergSourceSplit> comparator;
+
+ public
OrderedSplitAssignerFactory(SerializableComparator<IcebergSourceSplit>
comparator) {
+ this.comparator = comparator;
+ }
@Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
+ public SplitAssigner createAssigner() {
+ return new DefaultSplitAssigner(comparator);
}
@Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
+ public SplitAssigner createAssigner(Collection<IcebergSourceSplitState>
assignerState) {
+ return new DefaultSplitAssigner(comparator, assignerState);
}
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
index 1c14f4fcf9..a2e2ff364d 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
@@ -23,14 +23,15 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+ public SimpleSplitAssignerFactory() {}
@Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
+ public SplitAssigner createAssigner() {
+ return new DefaultSplitAssigner(null);
}
@Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
+ public SplitAssigner createAssigner(Collection<IcebergSourceSplitState>
assignerState) {
+ return new DefaultSplitAssigner(null, assignerState);
}
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 84c98055ce..8d7d68f961 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -36,9 +37,10 @@ public class IcebergSourceReader<T>
public IcebergSourceReader(
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
+ SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
- () -> new IcebergSourceSplitReader<>(metrics, readerFunction, context),
+ () -> new IcebergSourceSplitReader<>(metrics, readerFunction,
splitComparator, context),
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index bb016671e6..4e270dfa3d 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collections;
+import java.util.List;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
@@ -31,7 +32,9 @@ import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +43,7 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
private final IcebergSourceReaderMetrics metrics;
private final ReaderFunction<T> openSplitFunction;
+ private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
@@ -50,9 +54,11 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
IcebergSourceSplitReader(
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> openSplitFunction,
+ SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
this.metrics = metrics;
this.openSplitFunction = openSplitFunction;
+ this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();
}
@@ -93,8 +99,15 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
String.format("Unsupported split change: %s",
splitsChange.getClass()));
}
- LOG.info("Add {} splits to reader", splitsChange.splits().size());
- splits.addAll(splitsChange.splits());
+ if (splitComparator != null) {
+ List<IcebergSourceSplit> newSplits =
Lists.newArrayList(splitsChange.splits());
+ newSplits.sort(splitComparator);
+ LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits);
+ splits.addAll(newSplits);
+ } else {
+ LOG.info("Add {} splits to reader", splitsChange.splits().size());
+ splits.addAll(splitsChange.splits());
+ }
metrics.incrementAssignedSplits(splitsChange.splits().size());
metrics.incrementAssignedBytes(calculateBytes(splitsChange));
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
similarity index 57%
copy from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
index 1c14f4fcf9..319648ca27 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
@@ -16,21 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.flink.source.assigner;
+package org.apache.iceberg.flink.source.split;
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import java.io.Serializable;
+import java.util.Comparator;
-/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
-
- @Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
- }
-
- @Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
- }
-}
+public interface SerializableComparator<T> extends Comparator<T>, Serializable
{}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
new file mode 100644
index 0000000000..64e03d77de
--- /dev/null
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Provides implementations of {@link
org.apache.iceberg.flink.source.split.SerializableComparator}
+ * which could be used for ordering splits. These are used by the {@link
+ * org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory} and
the {@link
+ * org.apache.iceberg.flink.source.reader.IcebergSourceReader}
+ */
+public class SplitComparators {
+ private SplitComparators() {}
+
+ /** Comparator which orders the splits based on the file sequence number of
the data files */
+ public static SerializableComparator<IcebergSourceSplit>
fileSequenceNumber() {
+ return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+ Preconditions.checkArgument(
+ o1.task().files().size() == 1 && o2.task().files().size() == 1,
+ "Could not compare combined task. Please use 'split-open-file-cost'
to prevent combining multiple files to a split");
+
+ Long seq1 =
o1.task().files().iterator().next().file().fileSequenceNumber();
+ Long seq2 =
o2.task().files().iterator().next().file().fileSequenceNumber();
+
+ Preconditions.checkNotNull(
+ seq1,
+ "Invalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
+ o1);
+ Preconditions.checkNotNull(
+ seq2,
+ "IInvalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
+ o2);
+
+ int temp = Long.compare(seq1, seq2);
+ if (temp != 0) {
+ return temp;
+ } else {
+ return o1.splitId().compareTo(o2.splitId());
+ }
+ };
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
index 8dc68aad10..3a8071523b 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -20,19 +20,21 @@ package org.apache.iceberg.flink.source;
import java.io.File;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
import org.junit.Assert;
@@ -40,8 +42,6 @@ import org.junit.rules.TemporaryFolder;
public class SplitHelpers {
- private static final AtomicLong splitLengthIncrement = new AtomicLong();
-
private SplitHelpers() {}
/**
@@ -54,16 +54,53 @@ public class SplitHelpers {
*
* <p>Since the table and data files are deleted before this method
return, caller shouldn't
* attempt to read the data files.
+ *
+ * <p>By default, v1 Iceberg table is created. For v2 table use {@link
+ * SplitHelpers#createSplitsFromTransientHadoopTable(TemporaryFolder,
int, int, String)}
+ *
+ * @param temporaryFolder Folder to place the data to
+ * @param fileCount The number of files to create and add to the table
+ * @param filesPerSplit The number of files used for a split
*/
public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit)
throws Exception {
+ return createSplitsFromTransientHadoopTable(temporaryFolder, fileCount,
filesPerSplit, "1");
+ }
+
+ /**
+ * This create a list of IcebergSourceSplit from real files
+ * <li>Create a new Hadoop table under the {@code temporaryFolder}
+ * <li>write {@code fileCount} number of files to the new Iceberg table
+ * <li>Discover the splits from the table and partition the splits by the
{@code filePerSplit}
+ * limit
+ * <li>Delete the Hadoop table
+ *
+ * <p>Since the table and data files are deleted before this method
return, caller shouldn't
+ * attempt to read the data files.
+ *
+ * @param temporaryFolder Folder to place the data to
+ * @param fileCount The number of files to create and add to the table
+ * @param filesPerSplit The number of files used for a split
+ * @param version The table version to create
+ */
+ public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+ TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit,
String version)
+ throws Exception {
final File warehouseFile = temporaryFolder.newFolder();
Assert.assertTrue(warehouseFile.delete());
final String warehouse = "file:" + warehouseFile;
Configuration hadoopConf = new Configuration();
final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+ ImmutableMap<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION, version);
try {
- final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ final Table table =
+ catalog.createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ TestFixtures.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties);
final GenericAppenderHelper dataAppender =
new GenericAppenderHelper(table, FileFormat.PARQUET,
temporaryFolder);
for (int i = 0; i < fileCount; ++i) {
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
similarity index 88%
rename from
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
rename to
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index ee6c5cc3a6..f28677ca9d 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -30,22 +30,24 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestSimpleSplitAssigner {
+public abstract class SplitAssignerTestBase {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
@Test
public void testEmptyInitialization() {
- SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+ SplitAssigner assigner = splitAssigner();
assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
}
/** Test a sequence of interactions for StaticEnumerator */
@Test
public void testStaticEnumeratorSequence() throws Exception {
- SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+ SplitAssigner assigner = splitAssigner();
assigner.onDiscoveredSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
2));
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
1));
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertSnapshot(assigner, 1);
assigner.onUnassignedSplits(
@@ -61,7 +63,7 @@ public class TestSimpleSplitAssigner {
/** Test a sequence of interactions for ContinuousEnumerator */
@Test
public void testContinuousEnumeratorSequence() throws Exception {
- SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+ SplitAssigner assigner = splitAssigner();
assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
List<IcebergSourceSplit> splits1 =
@@ -81,7 +83,7 @@ public class TestSimpleSplitAssigner {
}
private void assertAvailableFuture(
- SimpleSplitAssigner assigner, int splitCount, Runnable
addSplitsRunnable) {
+ SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) {
// register callback
AtomicBoolean futureCompleted = new AtomicBoolean();
CompletableFuture<Void> future = assigner.isAvailable();
@@ -102,7 +104,7 @@ public class TestSimpleSplitAssigner {
assertSnapshot(assigner, 0);
}
- private void assertGetNext(SimpleSplitAssigner assigner,
GetSplitResult.Status expectedStatus) {
+ protected void assertGetNext(SplitAssigner assigner, GetSplitResult.Status
expectedStatus) {
GetSplitResult result = assigner.getNext(null);
Assert.assertEquals(expectedStatus, result.status());
switch (expectedStatus) {
@@ -118,8 +120,10 @@ public class TestSimpleSplitAssigner {
}
}
- private void assertSnapshot(SimpleSplitAssigner assigner, int splitCount) {
+ protected void assertSnapshot(SplitAssigner assigner, int splitCount) {
Collection<IcebergSourceSplitState> stateBeforeGet = assigner.state();
Assert.assertEquals(splitCount, stateBeforeGet.size());
}
+
+ protected abstract SplitAssigner splitAssigner();
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
similarity index 52%
copy from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
index 1c14f4fcf9..8994f3054a 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
@@ -18,19 +18,26 @@
*/
package org.apache.iceberg.flink.source.assigner;
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
-
-/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.junit.Test;
+public class TestDefaultSplitAssigner extends SplitAssignerTestBase {
@Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
+ protected SplitAssigner splitAssigner() {
+ return new DefaultSplitAssigner(null);
}
- @Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
+ /** Test the assigner when multiple files are in a single split */
+ @Test
+ public void testMultipleFilesInASplit() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
2));
+
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertSnapshot(assigner, 1);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ assertSnapshot(assigner, 0);
}
}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
new file mode 100644
index 0000000000..8b9e132e0e
--- /dev/null
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.util.SerializationUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestBase {
+ @Override
+ protected SplitAssigner splitAssigner() {
+ return new
OrderedSplitAssignerFactory(SplitComparators.fileSequenceNumber()).createAssigner();
+ }
+
+ /** Test the assigner when multiple files are in a single split */
+ @Test
+ public void testMultipleFilesInAnIcebergSplit() {
+ SplitAssigner assigner = splitAssigner();
+ Assertions.assertThatThrownBy(
+ () ->
+ assigner.onDiscoveredSplits(
+
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+ "Multiple files in a split is not allowed")
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Please use 'split-open-file-cost'");
+ }
+
+ /** Test sorted splits */
+ @Test
+ public void testSplitSort() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5,
1, "2");
+
+ assigner.onDiscoveredSplits(splits.subList(3, 5));
+ assigner.onDiscoveredSplits(splits.subList(0, 1));
+ assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+ assertGetNext(assigner, 1L);
+ assertGetNext(assigner, 2L);
+ assertGetNext(assigner, 3L);
+ assertGetNext(assigner, 4L);
+ assertGetNext(assigner, 5L);
+
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ }
+
+ @Test
+ public void testSerializable() {
+ byte[] bytes =
SerializationUtil.serializeToBytes(SplitComparators.fileSequenceNumber());
+ SerializableComparator<IcebergSourceSplit> comparator =
+ SerializationUtil.deserializeFromBytes(bytes);
+ Assert.assertNotNull(comparator);
+ }
+
+ protected void assertGetNext(SplitAssigner assigner, Long
expectedSequenceNumber) {
+ GetSplitResult result = assigner.getNext(null);
+ ContentFile file = result.split().task().files().iterator().next().file();
+ Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index d0ae8fdf77..349eb11cf5 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -28,7 +28,7 @@ import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumerator
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
-import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
@@ -342,7 +342,7 @@ public class TestContinuousIcebergEnumerator {
ContinuousIcebergEnumerator enumerator =
new ContinuousIcebergEnumerator(
context,
- new SimpleSplitAssigner(Collections.emptyList()),
+ new DefaultSplitAssigner(null, Collections.emptyList()),
scanContext,
splitPlanner,
null);
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index 56af0caf12..def4f43685 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -35,6 +35,7 @@ import
org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -55,13 +56,68 @@ public class TestIcebergSourceReader {
TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
TestingMetricGroup metricGroup = new TestingMetricGroup();
TestingReaderContext readerContext = new TestingReaderContext(new
Configuration(), metricGroup);
- IcebergSourceReader reader = createReader(metricGroup, readerContext);
+ IcebergSourceReader reader = createReader(metricGroup, readerContext,
null);
reader.start();
testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
}
+ @Test
+ public void testReaderOrder() throws Exception {
+ // Create 2 splits
+ List<List<Record>> recordBatchList1 =
+ ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
+ CombinedScanTask task1 =
+ ReaderUtil.createCombinedScanTask(
+ recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET,
appenderFactory);
+
+ List<List<Record>> recordBatchList2 =
+ ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
+ CombinedScanTask task2 =
+ ReaderUtil.createCombinedScanTask(
+ recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET,
appenderFactory);
+
+ // Sort the splits in one way
+ List<RowData> rowDataList1 =
+ read(
+ Arrays.asList(
+ IcebergSourceSplit.fromCombinedScanTask(task1),
+ IcebergSourceSplit.fromCombinedScanTask(task2)),
+ 2);
+
+ // Reverse the splits
+ List<RowData> rowDataList2 =
+ read(
+ Arrays.asList(
+ IcebergSourceSplit.fromCombinedScanTask(task2),
+ IcebergSourceSplit.fromCombinedScanTask(task1)),
+ 2);
+
+ // Check that the order of the elements is not changed
+ Assert.assertEquals(rowDataList1.get(0), rowDataList2.get(0));
+ Assert.assertEquals(rowDataList1.get(1), rowDataList2.get(1));
+ }
+
+ private List<RowData> read(List<IcebergSourceSplit> splits, long expected)
throws Exception {
+ TestingMetricGroup metricGroup = new TestingMetricGroup();
+ TestingReaderContext readerContext = new TestingReaderContext(new
Configuration(), metricGroup);
+ // Using IdBasedComparator, so we can have a deterministic order of the
splits
+ IcebergSourceReader reader = createReader(metricGroup, readerContext, new
IdBasedComparator());
+ reader.start();
+
+ reader.addSplits(splits);
+ TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+ while (readerOutput.getEmittedRecords().size() < expected) {
+ reader.pollNext(readerOutput);
+ }
+
+ reader.pollNext(readerOutput);
+
+ Assert.assertEquals(expected, readerOutput.getEmittedRecords().size());
+ return readerOutput.getEmittedRecords();
+ }
+
private void testOneSplitFetcher(
IcebergSourceReader reader,
TestingReaderOutput<RowData> readerOutput,
@@ -96,7 +152,9 @@ public class TestIcebergSourceReader {
}
private IcebergSourceReader createReader(
- MetricGroup metricGroup, SourceReaderContext readerContext) {
+ MetricGroup metricGroup,
+ SourceReaderContext readerContext,
+ SerializableComparator<IcebergSourceSplit> splitComparator) {
IcebergSourceReaderMetrics readerMetrics =
new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
RowDataReaderFunction readerFunction =
@@ -109,6 +167,13 @@ public class TestIcebergSourceReader {
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager(),
Collections.emptyList());
- return new IcebergSourceReader<>(readerMetrics, readerFunction,
readerContext);
+ return new IcebergSourceReader<>(readerMetrics, readerFunction,
splitComparator, readerContext);
+ }
+
+ private static class IdBasedComparator implements
SerializableComparator<IcebergSourceSplit> {
+ @Override
+ public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) {
+ return o1.splitId().compareTo(o2.splitId());
+ }
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index cbdd184870..d3859452a2 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -48,6 +48,8 @@ import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
@@ -63,6 +65,7 @@ import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
@@ -76,6 +79,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
+ private final SerializableComparator<IcebergSourceSplit> splitComparator;
// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
@@ -86,11 +90,13 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
ScanContext scanContext,
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
+ SerializableComparator<IcebergSourceSplit> splitComparator,
Table table) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
+ this.splitComparator = splitComparator;
this.table = table;
}
@@ -146,7 +152,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext
readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
- return new IcebergSourceReader<>(metrics, readerFunction, readerContext);
+ return new IcebergSourceReader<>(metrics, readerFunction, splitComparator,
readerContext);
}
@Override
@@ -209,6 +215,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private TableLoader tableLoader;
private Table table;
private SplitAssignerFactory splitAssignerFactory;
+ private SerializableComparator<IcebergSourceSplit> splitComparator;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -234,6 +241,12 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return this;
}
+ public Builder<T> splitComparator(
+ SerializableComparator<IcebergSourceSplit> newSplitComparator) {
+ this.splitComparator = newSplitComparator;
+ return this;
+ }
+
public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
this.readerFunction = newReaderFunction;
return this;
@@ -462,10 +475,18 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
}
+ if (splitAssignerFactory == null) {
+ if (splitComparator == null) {
+ splitAssignerFactory = new SimpleSplitAssignerFactory();
+ } else {
+ splitAssignerFactory = new
OrderedSplitAssignerFactory(splitComparator);
+ }
+ }
+
checkRequired();
// Since builder already load the table, pass it to the source to avoid
double loading
return new IcebergSource<T>(
- tableLoader, context, readerFunction, splitAssignerFactory, table);
+ tableLoader, context, readerFunction, splitAssignerFactory,
splitComparator, table);
}
private void checkRequired() {
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
similarity index 82%
rename from
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
rename to
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index b43660f907..37a0f1a605 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -20,7 +20,8 @@ package org.apache.iceberg.flink.source.assigner;
import java.util.ArrayDeque;
import java.util.Collection;
-import java.util.Deque;
+import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -28,24 +29,27 @@ import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
/**
* Since all methods are called in the source coordinator thread by
enumerator, there is no need for
* locking.
*/
@Internal
-public class SimpleSplitAssigner implements SplitAssigner {
+public class DefaultSplitAssigner implements SplitAssigner {
- private final Deque<IcebergSourceSplit> pendingSplits;
+ private final Queue<IcebergSourceSplit> pendingSplits;
private CompletableFuture<Void> availableFuture;
- public SimpleSplitAssigner() {
- this.pendingSplits = new ArrayDeque<>();
+ public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit>
comparator) {
+ this.pendingSplits = comparator == null ? new ArrayDeque<>() : new
PriorityQueue<>(comparator);
}
- public SimpleSplitAssigner(Collection<IcebergSourceSplitState>
assignerState) {
- this.pendingSplits = new ArrayDeque<>(assignerState.size());
- // Because simple assigner only tracks unassigned splits,
+ public DefaultSplitAssigner(
+ SerializableComparator<IcebergSourceSplit> comparator,
+ Collection<IcebergSourceSplitState> assignerState) {
+ this(comparator);
+ // Because default assigner only tracks unassigned splits,
// there is no need to filter splits based on status (unassigned) here.
assignerState.forEach(splitState -> pendingSplits.add(splitState.split()));
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
similarity index 54%
copy from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
index 1c14f4fcf9..e58478897a 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/OrderedSplitAssignerFactory.java
@@ -19,18 +19,28 @@
package org.apache.iceberg.flink.source.assigner;
import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
-/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+/**
+ * Create default assigner with a comparator that hands out splits where the
order of the splits
+ * will be defined by the {@link SerializableComparator}.
+ */
+public class OrderedSplitAssignerFactory implements SplitAssignerFactory {
+ private final SerializableComparator<IcebergSourceSplit> comparator;
+
+ public
OrderedSplitAssignerFactory(SerializableComparator<IcebergSourceSplit>
comparator) {
+ this.comparator = comparator;
+ }
@Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
+ public SplitAssigner createAssigner() {
+ return new DefaultSplitAssigner(comparator);
}
@Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
+ public SplitAssigner createAssigner(Collection<IcebergSourceSplitState>
assignerState) {
+ return new DefaultSplitAssigner(comparator, assignerState);
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
index 1c14f4fcf9..a2e2ff364d 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
@@ -23,14 +23,15 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+ public SimpleSplitAssignerFactory() {}
@Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
+ public SplitAssigner createAssigner() {
+ return new DefaultSplitAssigner(null);
}
@Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
+ public SplitAssigner createAssigner(Collection<IcebergSourceSplitState>
assignerState) {
+ return new DefaultSplitAssigner(null, assignerState);
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 84c98055ce..8d7d68f961 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -36,9 +37,10 @@ public class IcebergSourceReader<T>
public IcebergSourceReader(
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
+ SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
- () -> new IcebergSourceSplitReader<>(metrics, readerFunction, context),
+ () -> new IcebergSourceSplitReader<>(metrics, readerFunction,
splitComparator, context),
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index bb016671e6..4e270dfa3d 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collections;
+import java.util.List;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
@@ -31,7 +32,9 @@ import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +43,7 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
private final IcebergSourceReaderMetrics metrics;
private final ReaderFunction<T> openSplitFunction;
+ private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
@@ -50,9 +54,11 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
IcebergSourceSplitReader(
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> openSplitFunction,
+ SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
this.metrics = metrics;
this.openSplitFunction = openSplitFunction;
+ this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();
}
@@ -93,8 +99,15 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
String.format("Unsupported split change: %s",
splitsChange.getClass()));
}
- LOG.info("Add {} splits to reader", splitsChange.splits().size());
- splits.addAll(splitsChange.splits());
+ if (splitComparator != null) {
+ List<IcebergSourceSplit> newSplits =
Lists.newArrayList(splitsChange.splits());
+ newSplits.sort(splitComparator);
+ LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits);
+ splits.addAll(newSplits);
+ } else {
+ LOG.info("Add {} splits to reader", splitsChange.splits().size());
+ splits.addAll(splitsChange.splits());
+ }
metrics.incrementAssignedSplits(splitsChange.splits().size());
metrics.incrementAssignedBytes(calculateBytes(splitsChange));
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
similarity index 57%
copy from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
index 1c14f4fcf9..319648ca27 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializableComparator.java
@@ -16,21 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.flink.source.assigner;
+package org.apache.iceberg.flink.source.split;
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import java.io.Serializable;
+import java.util.Comparator;
-/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
-
- @Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
- }
-
- @Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
- }
-}
+public interface SerializableComparator<T> extends Comparator<T>, Serializable
{}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
new file mode 100644
index 0000000000..64e03d77de
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Provides implementations of {@link
org.apache.iceberg.flink.source.split.SerializableComparator}
+ * which could be used for ordering splits. These are used by the {@link
+ * org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory} and
the {@link
+ * org.apache.iceberg.flink.source.reader.IcebergSourceReader}
+ */
+public class SplitComparators {
+ private SplitComparators() {}
+
+ /** Comparator which orders the splits based on the file sequence number of
the data files */
+ public static SerializableComparator<IcebergSourceSplit>
fileSequenceNumber() {
+ return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+ Preconditions.checkArgument(
+ o1.task().files().size() == 1 && o2.task().files().size() == 1,
+ "Could not compare combined task. Please use 'split-open-file-cost'
to prevent combining multiple files to a split");
+
+ Long seq1 =
o1.task().files().iterator().next().file().fileSequenceNumber();
+ Long seq2 =
o2.task().files().iterator().next().file().fileSequenceNumber();
+
+ Preconditions.checkNotNull(
+ seq1,
+ "Invalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
+ o1);
+ Preconditions.checkNotNull(
+ seq2,
+ "IInvalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
+ o2);
+
+ int temp = Long.compare(seq1, seq2);
+ if (temp != 0) {
+ return temp;
+ } else {
+ return o1.splitId().compareTo(o2.splitId());
+ }
+ };
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
index 8dc68aad10..3a8071523b 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -20,19 +20,21 @@ package org.apache.iceberg.flink.source;
import java.io.File;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
import org.junit.Assert;
@@ -40,8 +42,6 @@ import org.junit.rules.TemporaryFolder;
public class SplitHelpers {
- private static final AtomicLong splitLengthIncrement = new AtomicLong();
-
private SplitHelpers() {}
/**
@@ -54,16 +54,53 @@ public class SplitHelpers {
*
* <p>Since the table and data files are deleted before this method
return, caller shouldn't
* attempt to read the data files.
+ *
+ * <p>By default, v1 Iceberg table is created. For v2 table use {@link
+ * SplitHelpers#createSplitsFromTransientHadoopTable(TemporaryFolder,
int, int, String)}
+ *
+ * @param temporaryFolder Folder to place the data to
+ * @param fileCount The number of files to create and add to the table
+ * @param filesPerSplit The number of files used for a split
*/
public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit)
throws Exception {
+ return createSplitsFromTransientHadoopTable(temporaryFolder, fileCount,
filesPerSplit, "1");
+ }
+
+ /**
+ * This create a list of IcebergSourceSplit from real files
+ * <li>Create a new Hadoop table under the {@code temporaryFolder}
+ * <li>write {@code fileCount} number of files to the new Iceberg table
+ * <li>Discover the splits from the table and partition the splits by the
{@code filePerSplit}
+ * limit
+ * <li>Delete the Hadoop table
+ *
+ * <p>Since the table and data files are deleted before this method
return, caller shouldn't
+ * attempt to read the data files.
+ *
+ * @param temporaryFolder Folder to place the data to
+ * @param fileCount The number of files to create and add to the table
+ * @param filesPerSplit The number of files used for a split
+ * @param version The table version to create
+ */
+ public static List<IcebergSourceSplit> createSplitsFromTransientHadoopTable(
+ TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit,
String version)
+ throws Exception {
final File warehouseFile = temporaryFolder.newFolder();
Assert.assertTrue(warehouseFile.delete());
final String warehouse = "file:" + warehouseFile;
Configuration hadoopConf = new Configuration();
final HadoopCatalog catalog = new HadoopCatalog(hadoopConf, warehouse);
+ ImmutableMap<String, String> properties =
+ ImmutableMap.of(TableProperties.FORMAT_VERSION, version);
try {
- final Table table = catalog.createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
+ final Table table =
+ catalog.createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ TestFixtures.SCHEMA,
+ PartitionSpec.unpartitioned(),
+ null,
+ properties);
final GenericAppenderHelper dataAppender =
new GenericAppenderHelper(table, FileFormat.PARQUET,
temporaryFolder);
for (int i = 0; i < fileCount; ++i) {
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
similarity index 88%
rename from
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
rename to
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index ee6c5cc3a6..f28677ca9d 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSimpleSplitAssigner.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -30,22 +30,24 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestSimpleSplitAssigner {
+public abstract class SplitAssignerTestBase {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
@Test
public void testEmptyInitialization() {
- SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+ SplitAssigner assigner = splitAssigner();
assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
}
/** Test a sequence of interactions for StaticEnumerator */
@Test
public void testStaticEnumeratorSequence() throws Exception {
- SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+ SplitAssigner assigner = splitAssigner();
assigner.onDiscoveredSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
2));
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
1));
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertSnapshot(assigner, 1);
assigner.onUnassignedSplits(
@@ -61,7 +63,7 @@ public class TestSimpleSplitAssigner {
/** Test a sequence of interactions for ContinuousEnumerator */
@Test
public void testContinuousEnumeratorSequence() throws Exception {
- SimpleSplitAssigner assigner = new SimpleSplitAssigner();
+ SplitAssigner assigner = splitAssigner();
assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
List<IcebergSourceSplit> splits1 =
@@ -81,7 +83,7 @@ public class TestSimpleSplitAssigner {
}
private void assertAvailableFuture(
- SimpleSplitAssigner assigner, int splitCount, Runnable
addSplitsRunnable) {
+ SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) {
// register callback
AtomicBoolean futureCompleted = new AtomicBoolean();
CompletableFuture<Void> future = assigner.isAvailable();
@@ -102,7 +104,7 @@ public class TestSimpleSplitAssigner {
assertSnapshot(assigner, 0);
}
- private void assertGetNext(SimpleSplitAssigner assigner,
GetSplitResult.Status expectedStatus) {
+ protected void assertGetNext(SplitAssigner assigner, GetSplitResult.Status
expectedStatus) {
GetSplitResult result = assigner.getNext(null);
Assert.assertEquals(expectedStatus, result.status());
switch (expectedStatus) {
@@ -118,8 +120,10 @@ public class TestSimpleSplitAssigner {
}
}
- private void assertSnapshot(SimpleSplitAssigner assigner, int splitCount) {
+ protected void assertSnapshot(SplitAssigner assigner, int splitCount) {
Collection<IcebergSourceSplitState> stateBeforeGet = assigner.state();
Assert.assertEquals(splitCount, stateBeforeGet.size());
}
+
+ protected abstract SplitAssigner splitAssigner();
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
similarity index 52%
copy from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
copy to
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
index 1c14f4fcf9..8994f3054a 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssignerFactory.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestDefaultSplitAssigner.java
@@ -18,19 +18,26 @@
*/
package org.apache.iceberg.flink.source.assigner;
-import java.util.Collection;
-import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
-
-/** Create simple assigner that hands out splits without any guarantee in
order or locality. */
-public class SimpleSplitAssignerFactory implements SplitAssignerFactory {
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.junit.Test;
+public class TestDefaultSplitAssigner extends SplitAssignerTestBase {
@Override
- public SimpleSplitAssigner createAssigner() {
- return new SimpleSplitAssigner();
+ protected SplitAssigner splitAssigner() {
+ return new DefaultSplitAssigner(null);
}
- @Override
- public SimpleSplitAssigner
createAssigner(Collection<IcebergSourceSplitState> assignerState) {
- return new SimpleSplitAssigner(assignerState);
+ /** Test the assigner when multiple files are in a single split */
+ @Test
+ public void testMultipleFilesInASplit() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
2));
+
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertSnapshot(assigner, 1);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ assertSnapshot(assigner, 0);
}
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
new file mode 100644
index 0000000000..8b9e132e0e
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.util.SerializationUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestBase {
+ @Override
+ protected SplitAssigner splitAssigner() {
+ return new
OrderedSplitAssignerFactory(SplitComparators.fileSequenceNumber()).createAssigner();
+ }
+
+ /** Test the assigner when multiple files are in a single split */
+ @Test
+ public void testMultipleFilesInAnIcebergSplit() {
+ SplitAssigner assigner = splitAssigner();
+ Assertions.assertThatThrownBy(
+ () ->
+ assigner.onDiscoveredSplits(
+
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+ "Multiple files in a split is not allowed")
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Please use 'split-open-file-cost'");
+ }
+
+ /** Test sorted splits */
+ @Test
+ public void testSplitSort() throws Exception {
+ SplitAssigner assigner = splitAssigner();
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5,
1, "2");
+
+ assigner.onDiscoveredSplits(splits.subList(3, 5));
+ assigner.onDiscoveredSplits(splits.subList(0, 1));
+ assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+ assertGetNext(assigner, 1L);
+ assertGetNext(assigner, 2L);
+ assertGetNext(assigner, 3L);
+ assertGetNext(assigner, 4L);
+ assertGetNext(assigner, 5L);
+
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ }
+
+ @Test
+ public void testSerializable() {
+ byte[] bytes =
SerializationUtil.serializeToBytes(SplitComparators.fileSequenceNumber());
+ SerializableComparator<IcebergSourceSplit> comparator =
+ SerializationUtil.deserializeFromBytes(bytes);
+ Assert.assertNotNull(comparator);
+ }
+
+ protected void assertGetNext(SplitAssigner assigner, Long
expectedSequenceNumber) {
+ GetSplitResult result = assigner.getNext(null);
+ ContentFile file = result.split().task().files().iterator().next().file();
+ Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index d0ae8fdf77..349eb11cf5 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -28,7 +28,7 @@ import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumerator
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
-import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.assigner.DefaultSplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
@@ -342,7 +342,7 @@ public class TestContinuousIcebergEnumerator {
ContinuousIcebergEnumerator enumerator =
new ContinuousIcebergEnumerator(
context,
- new SimpleSplitAssigner(Collections.emptyList()),
+ new DefaultSplitAssigner(null, Collections.emptyList()),
scanContext,
splitPlanner,
null);
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index 56af0caf12..def4f43685 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -35,6 +35,7 @@ import
org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -55,13 +56,68 @@ public class TestIcebergSourceReader {
TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
TestingMetricGroup metricGroup = new TestingMetricGroup();
TestingReaderContext readerContext = new TestingReaderContext(new
Configuration(), metricGroup);
- IcebergSourceReader reader = createReader(metricGroup, readerContext);
+ IcebergSourceReader reader = createReader(metricGroup, readerContext,
null);
reader.start();
testOneSplitFetcher(reader, readerOutput, metricGroup, 1);
testOneSplitFetcher(reader, readerOutput, metricGroup, 2);
}
+ @Test
+ public void testReaderOrder() throws Exception {
+ // Create 2 splits
+ List<List<Record>> recordBatchList1 =
+ ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
+ CombinedScanTask task1 =
+ ReaderUtil.createCombinedScanTask(
+ recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET,
appenderFactory);
+
+ List<List<Record>> recordBatchList2 =
+ ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
+ CombinedScanTask task2 =
+ ReaderUtil.createCombinedScanTask(
+ recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET,
appenderFactory);
+
+ // Sort the splits in one way
+ List<RowData> rowDataList1 =
+ read(
+ Arrays.asList(
+ IcebergSourceSplit.fromCombinedScanTask(task1),
+ IcebergSourceSplit.fromCombinedScanTask(task2)),
+ 2);
+
+ // Reverse the splits
+ List<RowData> rowDataList2 =
+ read(
+ Arrays.asList(
+ IcebergSourceSplit.fromCombinedScanTask(task2),
+ IcebergSourceSplit.fromCombinedScanTask(task1)),
+ 2);
+
+ // Check that the order of the elements is not changed
+ Assert.assertEquals(rowDataList1.get(0), rowDataList2.get(0));
+ Assert.assertEquals(rowDataList1.get(1), rowDataList2.get(1));
+ }
+
+ private List<RowData> read(List<IcebergSourceSplit> splits, long expected)
throws Exception {
+ TestingMetricGroup metricGroup = new TestingMetricGroup();
+ TestingReaderContext readerContext = new TestingReaderContext(new
Configuration(), metricGroup);
+ // Using IdBasedComparator, so we can have a deterministic order of the
splits
+ IcebergSourceReader reader = createReader(metricGroup, readerContext, new
IdBasedComparator());
+ reader.start();
+
+ reader.addSplits(splits);
+ TestingReaderOutput<RowData> readerOutput = new TestingReaderOutput<>();
+ while (readerOutput.getEmittedRecords().size() < expected) {
+ reader.pollNext(readerOutput);
+ }
+
+ reader.pollNext(readerOutput);
+
+ Assert.assertEquals(expected, readerOutput.getEmittedRecords().size());
+ return readerOutput.getEmittedRecords();
+ }
+
private void testOneSplitFetcher(
IcebergSourceReader reader,
TestingReaderOutput<RowData> readerOutput,
@@ -96,7 +152,9 @@ public class TestIcebergSourceReader {
}
private IcebergSourceReader createReader(
- MetricGroup metricGroup, SourceReaderContext readerContext) {
+ MetricGroup metricGroup,
+ SourceReaderContext readerContext,
+ SerializableComparator<IcebergSourceSplit> splitComparator) {
IcebergSourceReaderMetrics readerMetrics =
new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
RowDataReaderFunction readerFunction =
@@ -109,6 +167,13 @@ public class TestIcebergSourceReader {
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager(),
Collections.emptyList());
- return new IcebergSourceReader<>(readerMetrics, readerFunction,
readerContext);
+ return new IcebergSourceReader<>(readerMetrics, readerFunction,
splitComparator, readerContext);
+ }
+
+ private static class IdBasedComparator implements
SerializableComparator<IcebergSourceSplit> {
+ @Override
+ public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) {
+ return o1.splitId().compareTo(o2.splitId());
+ }
}
}