This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 5e059c1bfb Flink: Backport #8553 to v1.15, v1.16 (#9145)
5e059c1bfb is described below
commit 5e059c1bfb664cc1880883425bfd68b2a8df3190
Author: pvary <[email protected]>
AuthorDate: Tue Nov 28 22:59:44 2023 +0100
Flink: Backport #8553 to v1.15, v1.16 (#9145)
---
.../apache/iceberg/flink/source/IcebergSource.java | 68 ++-
.../reader/ColumnStatsWatermarkExtractor.java | 98 +++++
.../flink/source/reader/IcebergSourceReader.java | 3 +-
...Emitter.java => SerializableRecordEmitter.java} | 24 +-
.../source/reader/SplitWatermarkExtractor.java} | 18 +-
.../reader/WatermarkExtractorRecordEmitter.java | 67 +++
.../flink/source/split/SplitComparators.java | 19 +-
.../flink/source/TestIcebergSourceFailover.java | 12 +-
...cebergSourceFailoverWithWatermarkExtractor.java | 112 +++++
.../TestIcebergSourceWithWatermarkExtractor.java | 481 +++++++++++++++++++++
.../source/assigner/SplitAssignerTestBase.java | 21 +-
.../TestFileSequenceNumberBasedSplitAssigner.java | 10 +-
.../assigner/TestWatermarkBasedSplitAssigner.java | 146 +++++++
.../iceberg/flink/source/reader/ReaderUtil.java | 8 +-
.../reader/TestColumnStatsWatermarkExtractor.java | 178 ++++++++
.../source/reader/TestIcebergSourceReader.java | 7 +-
.../apache/iceberg/flink/source/IcebergSource.java | 68 ++-
.../reader/ColumnStatsWatermarkExtractor.java | 98 +++++
.../flink/source/reader/IcebergSourceReader.java | 3 +-
.../source/reader/SerializableRecordEmitter.java} | 24 +-
.../source/reader/SplitWatermarkExtractor.java} | 18 +-
.../reader/WatermarkExtractorRecordEmitter.java | 67 +++
.../flink/source/split/SplitComparators.java | 19 +-
.../flink/source/TestIcebergSourceFailover.java | 12 +-
...cebergSourceFailoverWithWatermarkExtractor.java | 112 +++++
.../TestIcebergSourceWithWatermarkExtractor.java | 451 +++++++++++++++++++
.../source/assigner/SplitAssignerTestBase.java | 21 +-
.../TestFileSequenceNumberBasedSplitAssigner.java | 10 +-
.../assigner/TestWatermarkBasedSplitAssigner.java | 146 +++++++
.../iceberg/flink/source/reader/ReaderUtil.java | 8 +-
.../reader/TestColumnStatsWatermarkExtractor.java | 178 ++++++++
.../source/reader/TestIcebergSourceReader.java | 7 +-
32 files changed, 2408 insertions(+), 106 deletions(-)
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index f85f627726..179253cb3a 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
@@ -58,15 +59,20 @@ import
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
+ private final SerializableRecordEmitter<T> emitter;
// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
@@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
SerializableComparator<IcebergSourceSplit> splitComparator,
- Table table) {
+ Table table,
+ SerializableRecordEmitter<T> emitter) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
+ this.emitter = emitter;
}
String name() {
@@ -152,7 +161,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext
readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
- return new IcebergSourceReader<>(metrics, readerFunction, splitComparator,
readerContext);
+ return new IcebergSourceReader<>(
+ emitter, metrics, readerFunction, splitComparator, readerContext);
}
@Override
@@ -216,6 +226,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
+ private String watermarkColumn;
+ private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -237,6 +249,9 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
+ Preconditions.checkArgument(
+ watermarkColumn == null,
+ "Watermark column and SplitAssigner should not be set in the same
source");
this.splitAssignerFactory = assignerFactory;
return this;
}
@@ -429,6 +444,33 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return this;
}
+ /**
+ * Emits watermarks once per split based on the min value of column
statistics from files
+ * metadata in the given split. The generated watermarks are also used for
ordering the splits
+ * for read. Accepted column types are timestamp/timestamptz/long. For
long columns consider
+ * setting {@link #watermarkTimeUnit(TimeUnit)}.
+ *
+ * <p>Consider setting `read.split.open-file-cost` to prevent combining
small files to a single
+ * split when the watermark is used for watermark alignment.
+ */
+ public Builder<T> watermarkColumn(String columnName) {
+ Preconditions.checkArgument(
+ splitAssignerFactory == null,
+ "Watermark column and SplitAssigner should not be set in the same
source");
+ this.watermarkColumn = columnName;
+ return this;
+ }
+
+ /**
+ * When the type of the {@link #watermarkColumn} is {@link
+ * org.apache.iceberg.types.Types.LongType}, then sets the {@link
TimeUnit} to convert the
+ * value. The default value is {@link TimeUnit#MICROSECONDS}.
+ */
+ public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
+ this.watermarkTimeUnit = timeUnit;
+ return this;
+ }
+
/** @deprecated Use {@link #setAll} instead. */
@Deprecated
public Builder<T> properties(Map<String, String> properties) {
@@ -453,6 +495,18 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
+ SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ if (watermarkColumn != null) {
+ // Column statistics is needed for watermark generation
+ contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
+
+ SplitWatermarkExtractor watermarkExtractor =
+ new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn,
watermarkTimeUnit);
+ emitter =
SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
+ splitAssignerFactory =
+ new
OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
+ }
+
ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
@@ -485,8 +539,14 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
checkRequired();
// Since builder already load the table, pass it to the source to avoid
double loading
- return new IcebergSource<T>(
- tableLoader, context, readerFunction, splitAssignerFactory,
splitComparator, table);
+ return new IcebergSource<>(
+ tableLoader,
+ context,
+ readerFunction,
+ splitAssignerFactory,
+ splitComparator,
+ table,
+ emitter);
}
private void checkRequired() {
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..4bb6f0a98c
--- /dev/null
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor,
Serializable {
+ private final int eventTimeFieldId;
+ private final String eventTimeFieldName;
+ private final TimeUnit timeUnit;
+
+ /**
+ * Creates the extractor.
+ *
+ * @param schema The schema of the Table
+ * @param eventTimeFieldName The column which should be used as an event time
+ * @param timeUnit Used for converting the long value to epoch milliseconds
+ */
+ public ColumnStatsWatermarkExtractor(
+ Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
+ Types.NestedField field = schema.findField(eventTimeFieldName);
+ TypeID typeID = field.type().typeId();
+ Preconditions.checkArgument(
+ typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+ "Found %s, expected a LONG or TIMESTAMP column for watermark
generation.",
+ typeID);
+ this.eventTimeFieldId = field.fieldId();
+ this.eventTimeFieldName = eventTimeFieldName;
+ // Use the timeUnit only for Long columns.
+ this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit :
TimeUnit.MICROSECONDS;
+ }
+
+ @VisibleForTesting
+ ColumnStatsWatermarkExtractor(int eventTimeFieldId, String
eventTimeFieldName) {
+ this.eventTimeFieldId = eventTimeFieldId;
+ this.eventTimeFieldName = eventTimeFieldName;
+ this.timeUnit = TimeUnit.MICROSECONDS;
+ }
+
+ /**
+ * Get the watermark for a split using column statistics.
+ *
+ * @param split The split
+ * @return The watermark
+ * @throws IllegalArgumentException if there is no statistics for the column
+ */
+ @Override
+ public long extractWatermark(IcebergSourceSplit split) {
+ return split.task().files().stream()
+ .map(
+ scanTask -> {
+ Preconditions.checkArgument(
+ scanTask.file().lowerBounds() != null
+ && scanTask.file().lowerBounds().get(eventTimeFieldId)
!= null,
+ "Missing statistics for column name = %s in file = %s",
+ eventTimeFieldName,
+ eventTimeFieldId,
+ scanTask.file());
+ return timeUnit.toMillis(
+ Conversions.fromByteBuffer(
+ Types.LongType.get(),
scanTask.file().lowerBounds().get(eventTimeFieldId)));
+ })
+ .min(Comparator.comparingLong(l -> l))
+ .get();
+ }
+}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 8d7d68f961..f143b8d2df 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {
public IcebergSourceReader(
+ SerializableRecordEmitter<T> emitter,
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(metrics, readerFunction,
splitComparator, context),
- new IcebergSourceRecordEmitter<>(),
+ emitter,
context.getConfiguration(),
context);
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
similarity index 61%
copy from
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
copy to
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
index 337d9d3c42..a6e2c1dae2 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
@@ -18,19 +18,23 @@
*/
package org.apache.iceberg.flink.source.reader;
-import org.apache.flink.api.connector.source.SourceOutput;
+import java.io.Serializable;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
-final class IcebergSourceRecordEmitter<T>
- implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
- IcebergSourceRecordEmitter() {}
+@Internal
+@FunctionalInterface
+public interface SerializableRecordEmitter<T>
+ extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>,
Serializable {
+ static <T> SerializableRecordEmitter<T> defaultEmitter() {
+ return (element, output, split) -> {
+ output.collect(element.record());
+ split.updatePosition(element.fileOffset(), element.recordOffset());
+ };
+ }
- @Override
- public void emitRecord(
- RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
- output.collect(element.record());
- split.updatePosition(element.fileOffset(), element.recordOffset());
+ static <T> SerializableRecordEmitter<T>
emitterWithWatermark(SplitWatermarkExtractor extractor) {
+ return new WatermarkExtractorRecordEmitter<>(extractor);
}
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
similarity index 63%
rename from
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
rename to
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
index 337d9d3c42..d1c50ac8ca 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
@@ -18,19 +18,11 @@
*/
package org.apache.iceberg.flink.source.reader;
-import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import java.io.Serializable;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
-final class IcebergSourceRecordEmitter<T>
- implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
- IcebergSourceRecordEmitter() {}
-
- @Override
- public void emitRecord(
- RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
- output.collect(element.record());
- split.updatePosition(element.fileOffset(), element.recordOffset());
- }
+/** The interface used to extract watermarks from splits. */
+public interface SplitWatermarkExtractor extends Serializable {
+ /** Get the watermark for a split. */
+ long extractWatermark(IcebergSourceSplit split);
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
new file mode 100644
index 0000000000..02ef57d344
--- /dev/null
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements
SerializableRecordEmitter<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+ private final SplitWatermarkExtractor timeExtractor;
+ private String lastSplitId = null;
+ private long watermark;
+
+ WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+ this.timeExtractor = timeExtractor;
+ }
+
+ @Override
+ public void emitRecord(
+ RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
+ if (!split.splitId().equals(lastSplitId)) {
+ long newWatermark = timeExtractor.extractWatermark(split);
+ if (newWatermark < watermark) {
+ LOG.info(
+ "Received a new split with lower watermark. Previous watermark =
{}, current watermark = {}, previous split = {}, current split = {}",
+ watermark,
+ newWatermark,
+ lastSplitId,
+ split.splitId());
+ } else {
+ watermark = newWatermark;
+ output.emitWatermark(new Watermark(watermark));
+ LOG.debug("Watermark = {} emitted based on split = {}", watermark,
lastSplitId);
+ }
+
+ lastSplitId = split.splitId();
+ }
+
+ output.collect(element.record());
+ split.updatePosition(element.fileOffset(), element.recordOffset());
+ }
+}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
index 64e03d77de..56ee92014d 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.split;
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
@@ -45,7 +46,7 @@ public class SplitComparators {
o1);
Preconditions.checkNotNull(
seq2,
- "IInvalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
+ "Invalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
o2);
int temp = Long.compare(seq1, seq2);
@@ -56,4 +57,20 @@ public class SplitComparators {
}
};
}
+
+ /** Comparator which orders the splits based on watermark of the splits */
+ public static SerializableComparator<IcebergSourceSplit> watermark(
+ SplitWatermarkExtractor watermarkExtractor) {
+ return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+ long watermark1 = watermarkExtractor.extractWatermark(o1);
+ long watermark2 = watermarkExtractor.extractWatermark(o2);
+
+ int temp = Long.compare(watermark1, watermark2);
+ if (temp != 0) {
+ return temp;
+ } else {
+ return o1.splitId().compareTo(o2.splitId());
+ }
+ };
+ }
}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 70e7a79d83..7d991ee603 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
@@ -97,6 +98,11 @@ public class TestIcebergSourceFailover {
return RandomGenericData.generate(schema(), numRecords, seed);
}
+ protected void assertRecords(Table table, List<Record> expectedRecords,
Duration timeout)
+ throws Exception {
+ SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
+ }
+
@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
@@ -150,8 +156,7 @@ public class TestIcebergSourceFailover {
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());
- SimpleDataUtil.assertTableRecords(
- sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+ assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
@Test
@@ -214,8 +219,7 @@ public class TestIcebergSourceFailover {
// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
- SimpleDataUtil.assertTableRecords(
- sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+ assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
// ------------------------------------------------------------------------
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
new file mode 100644
index 0000000000..f7dc931c50
--- /dev/null
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends
TestIcebergSourceFailover {
+ // Increment ts by 15 minutes for each generateRecords batch
+ private static final long RECORD_BATCH_TS_INCREMENT_MILLI =
TimeUnit.MINUTES.toMillis(15);
+ // Within a batch, increment ts by 1 second
+ private static final long RECORD_TS_INCREMENT_MILLI =
TimeUnit.SECONDS.toMillis(1);
+
+ private final AtomicLong tsMilli = new
AtomicLong(System.currentTimeMillis());
+
+ @Override
+ protected IcebergSource.Builder<RowData> sourceBuilder() {
+ return IcebergSource.<RowData>builder()
+ .tableLoader(sourceTableResource.tableLoader())
+ .watermarkColumn("ts")
+ .project(TestFixtures.TS_SCHEMA);
+ }
+
+ @Override
+ protected Schema schema() {
+ return TestFixtures.TS_SCHEMA;
+ }
+
+ @Override
+ protected List<Record> generateRecords(int numRecords, long seed) {
+ // Override the ts field to create a more realistic situation for event
time alignment
+ tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
+ return RandomGenericData.generate(schema(), numRecords, seed).stream()
+ .peek(
+ record -> {
+ LocalDateTime ts =
+ LocalDateTime.ofInstant(
+
Instant.ofEpochMilli(tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)),
+ ZoneId.of("Z"));
+ record.setField("ts", ts);
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * This override is needed because {@link Comparators} used by {@link
StructLikeWrapper} retrieves
+ * Timestamp type using Long type as inner class, while the {@link
RandomGenericData} generates
+ * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This
method normalizes the
+ * {@link LocalDateTime} to a Long type so that Comparators can continue to
work.
+ */
+ @Override
+ protected void assertRecords(Table table, List<Record> expectedRecords,
Duration timeout)
+ throws Exception {
+ List<Record> expectedNormalized =
convertLocalDateTimeToMilli(expectedRecords);
+ Awaitility.await("expected list of records should be produced")
+ .atMost(timeout)
+ .untilAsserted(
+ () -> {
+ SimpleDataUtil.equalsRecords(
+ expectedNormalized,
+
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+ table.schema());
+ SimpleDataUtil.assertRecordsEqual(
+ expectedNormalized,
+
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+ table.schema());
+ });
+ }
+
+ private List<Record> convertLocalDateTimeToMilli(List<Record> records) {
+ return records.stream()
+ .peek(
+ r -> {
+ LocalDateTime localDateTime = ((LocalDateTime) r.getField("ts"));
+ r.setField("ts",
localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
new file mode 100644
index 0000000000..7547323871
--- /dev/null
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+ private static final ConcurrentMap<Long, Integer> windows =
Maps.newConcurrentMap();
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ /**
+ * This is an integration test for watermark handling and windowing.
Integration testing the
+ * following features:
+ *
+ * <ul>
+ * <li>- Ordering of the splits
+ * <li>- Emitting of watermarks
+ * <li>- Firing windows based on watermarks
+ * </ul>
+ *
+ * <p>The test generates 4 splits
+ *
+ * <ul>
+ * <li>- Split 1 - Watermark 100 min
+ * <li>- Split 2, 3 - Watermark 0 min
+ * <li>- Split 4 - Watermark 6 min
+ * </ul>
+ *
+ * <p>Creates a source with 5 minutes tumbling window with parallelism 1 (to
prevent concurrency
+ * issues).
+ *
+ * <p>Checks that windows are handled correctly based on the emitted
watermarks, and splits are
+ * read in the following order:
+ *
+ * <ul>
+ * <li>- Split 2, 3
+ * <li>- Split 4
+ * <li>- Split 1
+ * </ul>
+ *
+ * <p>As a result the window aggregator emits the records based on in Split
2-3, and Split 4 data.
+ *
+ * <p>Add 2 more splits, so the task manager close the windows for the
original 4 splits and emit
+ * the appropriate aggregated records.
+ */
+ @Test
+ public void testWindowing() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - File 3 - Parallel write for the first records (Watermark 360000)
+ // - Split 1 - 2 records (6, "file_3-recordTs_6"), (7,
"file_3-recordTs_7")
+ List<Record> batch =
+ ImmutableList.of(
+ generateRecord(100, "file_1-recordTs_100"),
+ generateRecord(101, "file_1-recordTs_101"),
+ generateRecord(103, "file_1-recordTs_103"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ // Generate records where the timestamps are out of order, but still
between 0-5 minutes
+ batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+ }
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch =
+ ImmutableList.of(
+ generateRecord(6, "file_3-recordTs_6"), generateRecord(7,
"file_3-recordTs_7"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ source(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withTimestampAssigner(new RowDataTimestampAssigner()),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+ stream
+ .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+ .apply(
+ new AllWindowFunction<RowData, RowData, TimeWindow>() {
+ @Override
+ public void apply(
+ TimeWindow window, Iterable<RowData> values,
Collector<RowData> out) {
+ // Emit RowData which contains the window start time, and the
record count in
+ // that window
+ AtomicInteger count = new AtomicInteger(0);
+ values.forEach(a -> count.incrementAndGet());
+ out.collect(row(window.getStart(), count.get()));
+ windows.put(window.getStart(), count.get());
+ }
+ });
+
+ // Use static variable to collect the windows, since other solutions were
flaky
+ windows.clear();
+ env.executeAsync("Iceberg Source Windowing Test");
+
+ // Wait for the 2 first windows from File 2 and File 3
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () ->
+ windows.equals(
+ ImmutableMap.of(0L, RECORD_NUM_FOR_2_SPLITS,
TimeUnit.MINUTES.toMillis(5), 2)));
+
+ // Write data so the windows containing test data are closed
+ dataAppender.appendToTable(
+ dataAppender.writeFile(ImmutableList.of(generateRecord(1500,
"last-record"))));
+
+ // Wait for last test record window from File 1
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () ->
+ windows.equals(
+ ImmutableMap.of(
+ 0L,
+ RECORD_NUM_FOR_2_SPLITS,
+ TimeUnit.MINUTES.toMillis(5),
+ 2,
+ TimeUnit.MINUTES.toMillis(100),
+ 3)));
+ }
+
+ /**
+ * This is an integration test for watermark handling and throttling.
Integration testing the
+ * following:
+ *
+ * <ul>
+ * <li>- Emitting of watermarks
+ * <li>- Watermark alignment
+ * </ul>
+ *
+ * <p>The test generates 3 splits
+ *
+ * <ul>
+ * <li>- Split 1 - Watermark 100 min
+ * <li>- Split 2, 3 - Watermark 0 min
+ * </ul>
+ *
+ * The splits are read in the following order:
+ *
+ * <ul>
+ * <li>- Split 2, 3 (Task Manager 1, Task Manager 2)
+ * <li>- Split 1 (Task Manager 1 or ask Manager 2 depending on scheduling)
+ * </ul>
+ *
+ * Reading split 1 will cause the watermark alignment to pause reading for
the given task manager.
+ *
+ * <p>The status of the watermark alignment is checked by the alignment
related metrics.
+ *
+ * <p>Adding new records with old timestamps to the table will enable the
running reader to
+ * continue reading the files, but the watermark alignment will still
prevent the paused reader to
+ * continue.
+ *
+ * <p>After adding some records with new timestamps the blocked reader is
un-paused, and both ot
+ * the readers continue reading.
+ */
+ @Test
+ public void testThrottling() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ List<Record> batch =
+ ImmutableList.of(
+ generateRecord(100, "file_1-recordTs_100"), generateRecord(103,
"file_1-recordTs_103"));
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+ }
+
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ source(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withWatermarkAlignment("iceberg", Duration.ofMinutes(20),
Duration.ofMillis(10)),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+ // Flink 1.15 only change - start
+ CollectResultIterator<RowData> resultStream = addCollectSink(stream);
+
+ // Start the job
+ JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
+ resultStream.setJobClient(jobClient);
+ try (CollectResultIterator<RowData> resultIterator = resultStream) {
+ // Flink 1.15 only change - end
+
+ // Check that the read the non-blocked data
+ // The first RECORD_NUM_FOR_2_SPLITS should be read
+ // 1 or more from the runaway reader should be arrived depending on
thread scheduling
+ waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+ // Get the drift metric, wait for it to be created and reach the
expected state
+ // (100 min - 20 min - 0 min)
+ // Also this validates that the WatermarkAlignment is working
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () ->
+ findAlignmentDriftMetric(jobClient.getJobID(),
TimeUnit.MINUTES.toMillis(80))
+ .isPresent());
+ Gauge<Long> drift =
+ findAlignmentDriftMetric(jobClient.getJobID(),
TimeUnit.MINUTES.toMillis(80)).get();
+
+ // Add some old records with 2 splits, so even if the blocked gets one
split, the other reader
+ // one gets one as well
+ List<Record> newBatch1 =
+ ImmutableList.of(
+ generateRecord(15, "file_3-recordTs_15"),
+ generateRecord(16, "file_3-recordTs_16"),
+ generateRecord(17, "file_3-recordTs_17"));
+ List<Record> newBatch2 =
+ ImmutableList.of(
+ generateRecord(15, "file_4-recordTs_15"),
+ generateRecord(16, "file_4-recordTs_16"),
+ generateRecord(17, "file_4-recordTs_17"));
+ dataAppender.appendToTable(
+ dataAppender.writeFile(newBatch1),
dataAppender.writeFile(newBatch2));
+ // The records received will highly depend on scheduling
+ // We minimally get 3 records from the non-blocked reader
+ // We might get 1 record from the blocked reader (as part of the
previous batch -
+ // file_1-recordTs_100)
+ // We might get 3 records form the non-blocked reader if it gets both
new splits
+ waitForRecords(resultIterator, 3);
+
+ // Get the drift metric, wait for it to be created and reach the
expected state (100 min - 20
+ // min - 15 min)
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));
+
+ // Add some new records which should unblock the throttled reader
+ batch =
+ ImmutableList.of(
+ generateRecord(90, "file_5-recordTs_90"), generateRecord(91,
"file_5-recordTs_91"));
+ dataAppender.appendToTable(batch);
+ // We should get all the records at this point
+ waitForRecords(resultIterator, 6);
+
+ // Wait for the new drift to decrease below the allowed drift to signal
the normal state
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> drift.getValue() < TimeUnit.MINUTES.toMillis(20));
+ }
+ }
+
+ protected IcebergSource<RowData> source() {
+ return IcebergSource.<RowData>builder()
+ .tableLoader(sourceTableResource.tableLoader())
+ .watermarkColumn("ts")
+ .project(TestFixtures.TS_SCHEMA)
+ .splitSize(100L)
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(2))
+
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ }
+
+ protected Record generateRecord(int minutes, String str) {
+ // Override the ts field to create a more realistic situation for event
time alignment
+ Record record = GenericRecord.create(TestFixtures.TS_SCHEMA);
+ LocalDateTime ts =
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(Time.of(minutes,
TimeUnit.MINUTES).toMilliseconds()),
+ ZoneId.of("Z"));
+ record.setField("ts", ts);
+ record.setField("str", str);
+ return record;
+ }
+
+ protected void assertRecords(
+ Collection<Record> expectedRecords, CloseableIterator<RowData> iterator)
throws Exception {
+ Set<RowData> expected =
+ expectedRecords.stream()
+ .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
+ .collect(Collectors.toSet());
+ Assert.assertEquals(expected, waitForRecords(iterator,
expectedRecords.size()));
+ }
+
+ protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator,
int num) {
+ Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
+ assertThat(
+ CompletableFuture.supplyAsync(
+ () -> {
+ int count = 0;
+ while (count < num && iterator.hasNext()) {
+ received.add(iterator.next());
+ count++;
+ }
+
+ if (count < num) {
+ throw new IllegalStateException(String.format("Fail to get
%d records.", num));
+ }
+
+ return true;
+ }))
+ .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+
+ return received;
+ }
+
+ private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long
withValue) {
+ String metricsName = SOURCE_NAME + ".*" +
MetricNames.WATERMARK_ALIGNMENT_DRIFT;
+ return reporter.findMetrics(jobID, metricsName).values().stream()
+ .map(m -> (Gauge<Long>) m)
+ .filter(m -> m.getValue() == withValue)
+ .findFirst();
+ }
+
+ private GenericAppenderHelper appender() {
+ // We need to create multiple splits, so we need to generate parquet files
with multiple offsets
+ org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
+ hadoopConf.set("write.parquet.page-size-bytes", "64");
+ hadoopConf.set("write.parquet.row-group-size-bytes", "64");
+ return new GenericAppenderHelper(
+ sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER,
hadoopConf);
+ }
+
+ private static RowData row(long time, long count) {
+ GenericRowData result = new GenericRowData(2);
+ result.setField(0, time);
+ result.setField(1, String.valueOf(count));
+ return result;
+ }
+
+ private static class RowDataTimestampAssigner implements
SerializableTimestampAssigner<RowData> {
+ @Override
+ public long extractTimestamp(RowData element, long recordTimestamp) {
+ return element.getTimestamp(0, 0).getMillisecond();
+ }
+ }
+
+ // Flink 1.15 only method
+ private CollectResultIterator<RowData> addCollectSink(DataStream<RowData>
stream) {
+ TypeSerializer<RowData> serializer =
+ stream.getType().createSerializer(stream.getExecutionConfig());
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectSinkOperatorFactory<RowData> factory =
+ new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+ CollectSinkOperator<RowData> operator = (CollectSinkOperator<RowData>)
factory.getOperator();
+ CollectStreamSink<RowData> sink = new CollectStreamSink<>(stream, factory);
+ sink.name("Data stream collect sink");
+ stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+ return new CollectResultIterator<>(
+ operator.getOperatorIdFuture(),
+ serializer,
+ accumulatorName,
+ stream.getExecutionEnvironment().getCheckpointConfig());
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index f28677ca9d..090b304942 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -43,15 +43,13 @@ public abstract class SplitAssignerTestBase {
@Test
public void testStaticEnumeratorSequence() throws Exception {
SplitAssigner assigner = splitAssigner();
- assigner.onDiscoveredSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
1));
+ assigner.onDiscoveredSplits(createSplits(4, 1, "1"));
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertSnapshot(assigner, 1);
- assigner.onUnassignedSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1));
+ assigner.onUnassignedSplits(createSplits(1, 1, "1"));
assertSnapshot(assigner, 2);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -66,15 +64,12 @@ public abstract class SplitAssignerTestBase {
SplitAssigner assigner = splitAssigner();
assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
- List<IcebergSourceSplit> splits1 =
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ List<IcebergSourceSplit> splits1 = createSplits(1, 1, "1");
assertAvailableFuture(assigner, 1, () ->
assigner.onDiscoveredSplits(splits1));
- List<IcebergSourceSplit> splits2 =
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ List<IcebergSourceSplit> splits2 = createSplits(1, 1, "1");
assertAvailableFuture(assigner, 1, () ->
assigner.onUnassignedSplits(splits2));
- assigner.onDiscoveredSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 2,
1));
+ assigner.onDiscoveredSplits(createSplits(2, 1, "1"));
assertSnapshot(assigner, 2);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -125,5 +120,11 @@ public abstract class SplitAssignerTestBase {
Assert.assertEquals(splitCount, stateBeforeGet.size());
}
+ protected List<IcebergSourceSplit> createSplits(int fileCount, int
filesPerSplit, String version)
+ throws Exception {
+ return SplitHelpers.createSplitsFromTransientHadoopTable(
+ TEMPORARY_FOLDER, fileCount, filesPerSplit, version);
+ }
+
protected abstract SplitAssigner splitAssigner();
}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
index 8b9e132e0e..e78634e6b8 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.assigner;
import java.util.List;
import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
@@ -40,9 +39,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestB
public void testMultipleFilesInAnIcebergSplit() {
SplitAssigner assigner = splitAssigner();
Assertions.assertThatThrownBy(
- () ->
- assigner.onDiscoveredSplits(
-
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+ () -> assigner.onDiscoveredSplits(createSplits(4, 2, "2")),
"Multiple files in a split is not allowed")
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Please use 'split-open-file-cost'");
@@ -52,8 +49,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestB
@Test
public void testSplitSort() throws Exception {
SplitAssigner assigner = splitAssigner();
- List<IcebergSourceSplit> splits =
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5,
1, "2");
+ List<IcebergSourceSplit> splits = createSplits(5, 1, "2");
assigner.onDiscoveredSplits(splits.subList(3, 5));
assigner.onDiscoveredSplits(splits.subList(0, 1));
@@ -76,7 +72,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestB
Assert.assertNotNull(comparator);
}
- protected void assertGetNext(SplitAssigner assigner, Long
expectedSequenceNumber) {
+ private void assertGetNext(SplitAssigner assigner, Long
expectedSequenceNumber) {
GetSplitResult result = assigner.getNext(null);
ContentFile file = result.split().task().files().iterator().next().file();
Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
new file mode 100644
index 0000000000..e1fc63fda9
--- /dev/null
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
+ public static final Schema SCHEMA =
+ new Schema(required(1, "timestamp_column",
Types.TimestampType.withoutZone()));
+ private static final GenericAppenderFactory APPENDER_FACTORY = new
GenericAppenderFactory(SCHEMA);
+
+ @Override
+ protected SplitAssigner splitAssigner() {
+ return new OrderedSplitAssignerFactory(
+ SplitComparators.watermark(
+ new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column",
null)))
+ .createAssigner();
+ }
+
+ /** Test the assigner when multiple files are in a single split */
+ @Test
+ public void testMultipleFilesInAnIcebergSplit() {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(createSplits(4, 2, "2"));
+
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ }
+
+ /** Test sorted splits */
+ @Test
+ public void testSplitSort() {
+ SplitAssigner assigner = splitAssigner();
+
+ Instant now = Instant.now();
+ List<IcebergSourceSplit> splits =
+ IntStream.range(0, 5)
+ .mapToObj(i -> splitFromInstant(now.plus(i, ChronoUnit.MINUTES)))
+ .collect(Collectors.toList());
+
+ assigner.onDiscoveredSplits(splits.subList(3, 5));
+ assigner.onDiscoveredSplits(splits.subList(0, 1));
+ assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+ assertGetNext(assigner, splits.get(0));
+ assertGetNext(assigner, splits.get(1));
+ assertGetNext(assigner, splits.get(2));
+ assertGetNext(assigner, splits.get(3));
+ assertGetNext(assigner, splits.get(4));
+
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ }
+
+ @Test
+ public void testSerializable() {
+ byte[] bytes =
+ SerializationUtil.serializeToBytes(
+ SplitComparators.watermark(
+ new ColumnStatsWatermarkExtractor(
+ TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS)));
+ SerializableComparator<IcebergSourceSplit> comparator =
+ SerializationUtil.deserializeFromBytes(bytes);
+ Assert.assertNotNull(comparator);
+ }
+
+ private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split)
{
+ GetSplitResult result = assigner.getNext(null);
+ Assert.assertEquals(result.split(), split);
+ }
+
+ @Override
+ protected List<IcebergSourceSplit> createSplits(
+ int fileCount, int filesPerSplit, String version) {
+ return IntStream.range(0, fileCount / filesPerSplit)
+ .mapToObj(
+ splitNum ->
+ splitFromRecords(
+ IntStream.range(0, filesPerSplit)
+ .mapToObj(
+ fileNum ->
+ RandomGenericData.generate(
+ SCHEMA, 2, splitNum * filesPerSplit +
fileNum))
+ .collect(Collectors.toList())))
+ .collect(Collectors.toList());
+ }
+
+ private IcebergSourceSplit splitFromInstant(Instant instant) {
+ Record record = GenericRecord.create(SCHEMA);
+ record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
+ return splitFromRecords(ImmutableList.of(ImmutableList.of(record)));
+ }
+
+ private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
+ try {
+ return IcebergSourceSplit.fromCombinedScanTask(
+ ReaderUtil.createCombinedScanTask(
+ records, TEMPORARY_FOLDER, FileFormat.PARQUET,
APPENDER_FACTORY));
+ } catch (IOException e) {
+ throw new RuntimeException("Split creation exception", e);
+ }
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f9ceaf8422..2a2503ef24 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -60,9 +60,12 @@ public class ReaderUtil {
FileFormat fileFormat,
FileAppenderFactory<Record> appenderFactory)
throws IOException {
- try (FileAppender<Record> appender =
- appenderFactory.newAppender(Files.localOutput(file), fileFormat)) {
+ FileAppender<Record> appender =
+ appenderFactory.newAppender(Files.localOutput(file), fileFormat);
+ try {
appender.addAll(records);
+ } finally {
+ appender.close();
}
DataFile dataFile =
@@ -71,6 +74,7 @@ public class ReaderUtil {
.withFileSizeInBytes(file.length())
.withPath(file.toString())
.withFormat(fileFormat)
+ .withMetrics(appender.metrics())
.build();
ResidualEvaluator residuals =
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..afe8a5d015
--- /dev/null
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestColumnStatsWatermarkExtractor {
+ public static final Schema SCHEMA =
+ new Schema(
+ required(1, "timestamp_column", Types.TimestampType.withoutZone()),
+ required(2, "timestamptz_column", Types.TimestampType.withZone()),
+ required(3, "long_column", Types.LongType.get()),
+ required(4, "string_column", Types.StringType.get()));
+
+ private static final GenericAppenderFactory APPENDER_FACTORY = new
GenericAppenderFactory(SCHEMA);
+
+ private static final List<List<Record>> TEST_RECORDS =
+ ImmutableList.of(
+ RandomGenericData.generate(SCHEMA, 3, 2L),
RandomGenericData.generate(SCHEMA, 3, 19L));
+
+ private static final List<Map<String, Long>> MIN_VALUES =
+ ImmutableList.of(Maps.newHashMapWithExpectedSize(3),
Maps.newHashMapWithExpectedSize(3));
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE, SCHEMA);
+
+ private final String columnName;
+
+ @BeforeClass
+ public static void updateMinValue() {
+ for (int i = 0; i < TEST_RECORDS.size(); ++i) {
+ for (Record r : TEST_RECORDS.get(i)) {
+ Map<String, Long> minValues = MIN_VALUES.get(i);
+
+ LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+ minValues.merge(
+ "timestamp_column",
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
+
+ OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+ minValues.merge("timestamptz_column",
offsetDateTime.toInstant().toEpochMilli(), Math::min);
+
+ minValues.merge("long_column", (Long) r.get(2), Math::min);
+ }
+ }
+ }
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return ImmutableList.of(
+ new Object[] {"timestamp_column"},
+ new Object[] {"timestamptz_column"},
+ new Object[] {"long_column"});
+ }
+
+ public TestColumnStatsWatermarkExtractor(String columnName) {
+ this.columnName = columnName;
+ }
+
+ @Test
+ public void testSingle() throws IOException {
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
+ }
+
+ @Test
+ public void testTimeUnit() throws IOException {
+ Assume.assumeTrue("Run only for long column",
columnName.equals("long_column"));
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MICROSECONDS);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue() / 1000L,
+ extractor.extractWatermark(split(0)));
+ }
+
+ @Test
+ public void testMultipleFiles() throws IOException {
+ Assume.assumeTrue("Run only for the timestamp column",
columnName.equals("timestamp_column"));
+ IcebergSourceSplit combinedSplit =
+ IcebergSourceSplit.fromCombinedScanTask(
+ ReaderUtil.createCombinedScanTask(
+ TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET,
APPENDER_FACTORY));
+
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
+ Assert.assertEquals(
+ MIN_VALUES.get(1).get(columnName).longValue(),
extractor.extractWatermark(split(1)));
+ Assert.assertEquals(
+ Math.min(MIN_VALUES.get(0).get(columnName),
MIN_VALUES.get(1).get(columnName)),
+ extractor.extractWatermark(combinedSplit));
+ }
+
+ @Test
+ public void testWrongColumn() {
+ Assume.assumeTrue("Run only for string column",
columnName.equals("string_column"));
+ Assertions.assertThatThrownBy(() -> new
ColumnStatsWatermarkExtractor(SCHEMA, columnName, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Found STRING, expected a LONG or TIMESTAMP column for watermark
generation.");
+ }
+
+ @Test
+ public void testEmptyStatistics() throws IOException {
+ Assume.assumeTrue("Run only for timestamp column",
columnName.equals("timestamp_column"));
+
+ // Create an extractor for a column we do not have statistics
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(10, "missing_field");
+ Assertions.assertThatThrownBy(() -> extractor.extractWatermark(split(0)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Missing statistics for column");
+ }
+
+ private IcebergSourceSplit split(int id) throws IOException {
+ return IcebergSourceSplit.fromCombinedScanTask(
+ ReaderUtil.createCombinedScanTask(
+ ImmutableList.of(TEST_RECORDS.get(id)),
+ TEMPORARY_FOLDER,
+ FileFormat.PARQUET,
+ APPENDER_FACTORY));
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index def4f43685..88234c6112 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -167,7 +167,12 @@ public class TestIcebergSourceReader {
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager(),
Collections.emptyList());
- return new IcebergSourceReader<>(readerMetrics, readerFunction,
splitComparator, readerContext);
+ return new IcebergSourceReader<>(
+ SerializableRecordEmitter.defaultEmitter(),
+ readerMetrics,
+ readerFunction,
+ splitComparator,
+ readerContext);
}
private static class IdBasedComparator implements
SerializableComparator<IcebergSourceSplit> {
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index f85f627726..179253cb3a 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
@@ -58,15 +59,20 @@ import
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
+ private final SerializableRecordEmitter<T> emitter;
// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
@@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
SerializableComparator<IcebergSourceSplit> splitComparator,
- Table table) {
+ Table table,
+ SerializableRecordEmitter<T> emitter) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
+ this.emitter = emitter;
}
String name() {
@@ -152,7 +161,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext
readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
- return new IcebergSourceReader<>(metrics, readerFunction, splitComparator,
readerContext);
+ return new IcebergSourceReader<>(
+ emitter, metrics, readerFunction, splitComparator, readerContext);
}
@Override
@@ -216,6 +226,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
+ private String watermarkColumn;
+ private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
@@ -237,6 +249,9 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
+ Preconditions.checkArgument(
+ watermarkColumn == null,
+ "Watermark column and SplitAssigner should not be set in the same
source");
this.splitAssignerFactory = assignerFactory;
return this;
}
@@ -429,6 +444,33 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return this;
}
+ /**
+ * Emits watermarks once per split based on the min value of column
statistics from files
+ * metadata in the given split. The generated watermarks are also used for
ordering the splits
+ * for read. Accepted column types are timestamp/timestamptz/long. For
long columns consider
+ * setting {@link #watermarkTimeUnit(TimeUnit)}.
+ *
+ * <p>Consider setting `read.split.open-file-cost` to prevent combining
small files to a single
+ * split when the watermark is used for watermark alignment.
+ */
+ public Builder<T> watermarkColumn(String columnName) {
+ Preconditions.checkArgument(
+ splitAssignerFactory == null,
+ "Watermark column and SplitAssigner should not be set in the same
source");
+ this.watermarkColumn = columnName;
+ return this;
+ }
+
+ /**
+ * When the type of the {@link #watermarkColumn} is {@link
+ * org.apache.iceberg.types.Types.LongType}, then sets the {@link
TimeUnit} to convert the
+ * value. The default value is {@link TimeUnit#MICROSECONDS}.
+ */
+ public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
+ this.watermarkTimeUnit = timeUnit;
+ return this;
+ }
+
/** @deprecated Use {@link #setAll} instead. */
@Deprecated
public Builder<T> properties(Map<String, String> properties) {
@@ -453,6 +495,18 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
}
+ SerializableRecordEmitter<T> emitter =
SerializableRecordEmitter.defaultEmitter();
+ if (watermarkColumn != null) {
+ // Column statistics is needed for watermark generation
+ contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));
+
+ SplitWatermarkExtractor watermarkExtractor =
+ new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn,
watermarkTimeUnit);
+ emitter =
SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
+ splitAssignerFactory =
+ new
OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
+ }
+
ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
@@ -485,8 +539,14 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
checkRequired();
// Since builder already load the table, pass it to the source to avoid
double loading
- return new IcebergSource<T>(
- tableLoader, context, readerFunction, splitAssignerFactory,
splitComparator, table);
+ return new IcebergSource<>(
+ tableLoader,
+ context,
+ readerFunction,
+ splitAssignerFactory,
+ splitComparator,
+ table,
+ emitter);
}
private void checkRequired() {
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..4bb6f0a98c
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types;
+
+/**
+ * {@link SplitWatermarkExtractor} implementation which uses an Iceberg
timestamp column statistics
+ * to get the watermarks for the {@link IcebergSourceSplit}. This watermark is
emitted by the {@link
+ * WatermarkExtractorRecordEmitter} along with the actual records.
+ */
+@Internal
+public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor,
Serializable {
+ private final int eventTimeFieldId;
+ private final String eventTimeFieldName;
+ private final TimeUnit timeUnit;
+
+ /**
+ * Creates the extractor.
+ *
+ * @param schema The schema of the Table
+ * @param eventTimeFieldName The column which should be used as an event time
+ * @param timeUnit Used for converting the long value to epoch milliseconds
+ */
+ public ColumnStatsWatermarkExtractor(
+ Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
+ Types.NestedField field = schema.findField(eventTimeFieldName);
+ TypeID typeID = field.type().typeId();
+ Preconditions.checkArgument(
+ typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
+ "Found %s, expected a LONG or TIMESTAMP column for watermark
generation.",
+ typeID);
+ this.eventTimeFieldId = field.fieldId();
+ this.eventTimeFieldName = eventTimeFieldName;
+ // Use the timeUnit only for Long columns.
+ this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit :
TimeUnit.MICROSECONDS;
+ }
+
+ @VisibleForTesting
+ ColumnStatsWatermarkExtractor(int eventTimeFieldId, String
eventTimeFieldName) {
+ this.eventTimeFieldId = eventTimeFieldId;
+ this.eventTimeFieldName = eventTimeFieldName;
+ this.timeUnit = TimeUnit.MICROSECONDS;
+ }
+
+ /**
+ * Get the watermark for a split using column statistics.
+ *
+ * @param split The split
+ * @return The watermark
+ * @throws IllegalArgumentException if there is no statistics for the column
+ */
+ @Override
+ public long extractWatermark(IcebergSourceSplit split) {
+ return split.task().files().stream()
+ .map(
+ scanTask -> {
+ Preconditions.checkArgument(
+ scanTask.file().lowerBounds() != null
+ && scanTask.file().lowerBounds().get(eventTimeFieldId)
!= null,
+ "Missing statistics for column name = %s in file = %s",
+ eventTimeFieldName,
+ eventTimeFieldId,
+ scanTask.file());
+ return timeUnit.toMillis(
+ Conversions.fromByteBuffer(
+ Types.LongType.get(),
scanTask.file().lowerBounds().get(eventTimeFieldId)));
+ })
+ .min(Comparator.comparingLong(l -> l))
+ .get();
+ }
+}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
index 8d7d68f961..f143b8d2df 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
@@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {
public IcebergSourceReader(
+ SerializableRecordEmitter<T> emitter,
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(metrics, readerFunction,
splitComparator, context),
- new IcebergSourceRecordEmitter<>(),
+ emitter,
context.getConfiguration(),
context);
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
similarity index 61%
copy from
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
copy to
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
index 337d9d3c42..a6e2c1dae2 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SerializableRecordEmitter.java
@@ -18,19 +18,23 @@
*/
package org.apache.iceberg.flink.source.reader;
-import org.apache.flink.api.connector.source.SourceOutput;
+import java.io.Serializable;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
-final class IcebergSourceRecordEmitter<T>
- implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
- IcebergSourceRecordEmitter() {}
+@Internal
+@FunctionalInterface
+public interface SerializableRecordEmitter<T>
+ extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>,
Serializable {
+ static <T> SerializableRecordEmitter<T> defaultEmitter() {
+ return (element, output, split) -> {
+ output.collect(element.record());
+ split.updatePosition(element.fileOffset(), element.recordOffset());
+ };
+ }
- @Override
- public void emitRecord(
- RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
- output.collect(element.record());
- split.updatePosition(element.fileOffset(), element.recordOffset());
+ static <T> SerializableRecordEmitter<T>
emitterWithWatermark(SplitWatermarkExtractor extractor) {
+ return new WatermarkExtractorRecordEmitter<>(extractor);
}
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
similarity index 63%
rename from
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
rename to
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
index 337d9d3c42..d1c50ac8ca 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceRecordEmitter.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/SplitWatermarkExtractor.java
@@ -18,19 +18,11 @@
*/
package org.apache.iceberg.flink.source.reader;
-import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import java.io.Serializable;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
-final class IcebergSourceRecordEmitter<T>
- implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
-
- IcebergSourceRecordEmitter() {}
-
- @Override
- public void emitRecord(
- RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
- output.collect(element.record());
- split.updatePosition(element.fileOffset(), element.recordOffset());
- }
+/** The interface used to extract watermarks from splits. */
+public interface SplitWatermarkExtractor extends Serializable {
+ /** Get the watermark for a split. */
+ long extractWatermark(IcebergSourceSplit split);
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
new file mode 100644
index 0000000000..02ef57d344
--- /dev/null
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emitter which emits the watermarks, records and updates the split position.
+ *
+ * <p>The Emitter emits watermarks at the beginning of every split provided by
the {@link
+ * SplitWatermarkExtractor}.
+ */
+class WatermarkExtractorRecordEmitter<T> implements
SerializableRecordEmitter<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
+ private final SplitWatermarkExtractor timeExtractor;
+ private String lastSplitId = null;
+ private long watermark;
+
+ WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
+ this.timeExtractor = timeExtractor;
+ }
+
+ @Override
+ public void emitRecord(
+ RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit
split) {
+ if (!split.splitId().equals(lastSplitId)) {
+ long newWatermark = timeExtractor.extractWatermark(split);
+ if (newWatermark < watermark) {
+ LOG.info(
+ "Received a new split with lower watermark. Previous watermark =
{}, current watermark = {}, previous split = {}, current split = {}",
+ watermark,
+ newWatermark,
+ lastSplitId,
+ split.splitId());
+ } else {
+ watermark = newWatermark;
+ output.emitWatermark(new Watermark(watermark));
+ LOG.debug("Watermark = {} emitted based on split = {}", watermark,
lastSplitId);
+ }
+
+ lastSplitId = split.splitId();
+ }
+
+ output.collect(element.record());
+ split.updatePosition(element.fileOffset(), element.recordOffset());
+ }
+}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
index 64e03d77de..56ee92014d 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.split;
+import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
@@ -45,7 +46,7 @@ public class SplitComparators {
o1);
Preconditions.checkNotNull(
seq2,
- "IInvalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
+ "Invalid file sequence number: null. Doesn't support splits written
with V1 format: %s",
o2);
int temp = Long.compare(seq1, seq2);
@@ -56,4 +57,20 @@ public class SplitComparators {
}
};
}
+
+ /** Comparator which orders the splits based on watermark of the splits */
+ public static SerializableComparator<IcebergSourceSplit> watermark(
+ SplitWatermarkExtractor watermarkExtractor) {
+ return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
+ long watermark1 = watermarkExtractor.extractWatermark(o1);
+ long watermark2 = watermarkExtractor.extractWatermark(o2);
+
+ int temp = Long.compare(watermark1, watermark2);
+ if (temp != 0) {
+ return temp;
+ } else {
+ return o1.splitId().compareTo(o2.splitId());
+ }
+ };
+ }
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
index 70e7a79d83..7d991ee603 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
@@ -97,6 +98,11 @@ public class TestIcebergSourceFailover {
return RandomGenericData.generate(schema(), numRecords, seed);
}
+ protected void assertRecords(Table table, List<Record> expectedRecords,
Duration timeout)
+ throws Exception {
+ SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
+ }
+
@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
@@ -150,8 +156,7 @@ public class TestIcebergSourceFailover {
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());
- SimpleDataUtil.assertTableRecords(
- sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+ assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
@Test
@@ -214,8 +219,7 @@ public class TestIcebergSourceFailover {
// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
- SimpleDataUtil.assertTableRecords(
- sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
+ assertRecords(sinkTableResource.table(), expectedRecords,
Duration.ofSeconds(120));
}
// ------------------------------------------------------------------------
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
new file mode 100644
index 0000000000..f7dc931c50
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.awaitility.Awaitility;
+
+public class TestIcebergSourceFailoverWithWatermarkExtractor extends
TestIcebergSourceFailover {
+ // Increment ts by 15 minutes for each generateRecords batch
+ private static final long RECORD_BATCH_TS_INCREMENT_MILLI =
TimeUnit.MINUTES.toMillis(15);
+ // Within a batch, increment ts by 1 second
+ private static final long RECORD_TS_INCREMENT_MILLI =
TimeUnit.SECONDS.toMillis(1);
+
+ private final AtomicLong tsMilli = new
AtomicLong(System.currentTimeMillis());
+
+ @Override
+ protected IcebergSource.Builder<RowData> sourceBuilder() {
+ return IcebergSource.<RowData>builder()
+ .tableLoader(sourceTableResource.tableLoader())
+ .watermarkColumn("ts")
+ .project(TestFixtures.TS_SCHEMA);
+ }
+
+ @Override
+ protected Schema schema() {
+ return TestFixtures.TS_SCHEMA;
+ }
+
+ @Override
+ protected List<Record> generateRecords(int numRecords, long seed) {
+ // Override the ts field to create a more realistic situation for event
time alignment
+ tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
+ return RandomGenericData.generate(schema(), numRecords, seed).stream()
+ .peek(
+ record -> {
+ LocalDateTime ts =
+ LocalDateTime.ofInstant(
+
Instant.ofEpochMilli(tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)),
+ ZoneId.of("Z"));
+ record.setField("ts", ts);
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * This override is needed because {@link Comparators} used by {@link
StructLikeWrapper} retrieves
+ * Timestamp type using Long type as inner class, while the {@link
RandomGenericData} generates
+ * {@link LocalDateTime} for {@code TimestampType.withoutZone()}. This
method normalizes the
+ * {@link LocalDateTime} to a Long type so that Comparators can continue to
work.
+ */
+ @Override
+ protected void assertRecords(Table table, List<Record> expectedRecords,
Duration timeout)
+ throws Exception {
+ List<Record> expectedNormalized =
convertLocalDateTimeToMilli(expectedRecords);
+ Awaitility.await("expected list of records should be produced")
+ .atMost(timeout)
+ .untilAsserted(
+ () -> {
+ SimpleDataUtil.equalsRecords(
+ expectedNormalized,
+
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+ table.schema());
+ SimpleDataUtil.assertRecordsEqual(
+ expectedNormalized,
+
convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)),
+ table.schema());
+ });
+ }
+
+ private List<Record> convertLocalDateTimeToMilli(List<Record> records) {
+ return records.stream()
+ .peek(
+ r -> {
+ LocalDateTime localDateTime = ((LocalDateTime) r.getField("ts"));
+ r.setField("ts",
localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
new file mode 100644
index 0000000000..0bb2eb7766
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import static
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+ private static final int PARALLELISM = 4;
+ private static final String SOURCE_NAME = "IcebergSource";
+ private static final int RECORD_NUM_FOR_2_SPLITS = 200;
+ private static final ConcurrentMap<Long, Integer> windows =
Maps.newConcurrentMap();
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(
+ TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE,
TestFixtures.TS_SCHEMA);
+
+ /**
+ * This is an integration test for watermark handling and windowing.
Integration testing the
+ * following features:
+ *
+ * <ul>
+ * <li>- Ordering of the splits
+ * <li>- Emitting of watermarks
+ * <li>- Firing windows based on watermarks
+ * </ul>
+ *
+ * <p>The test generates 4 splits
+ *
+ * <ul>
+ * <li>- Split 1 - Watermark 100 min
+ * <li>- Split 2, 3 - Watermark 0 min
+ * <li>- Split 4 - Watermark 6 min
+ * </ul>
+ *
+ * <p>Creates a source with 5 minutes tumbling window with parallelism 1 (to
prevent concurrency
+ * issues).
+ *
+ * <p>Checks that windows are handled correctly based on the emitted
watermarks, and splits are
+ * read in the following order:
+ *
+ * <ul>
+ * <li>- Split 2, 3
+ * <li>- Split 4
+ * <li>- Split 1
+ * </ul>
+ *
+ * <p>As a result the window aggregator emits the records based on in Split
2-3, and Split 4 data.
+ *
+ * <p>Add 2 more splits, so the task manager close the windows for the
original 4 splits and emit
+ * the appropriate aggregated records.
+ */
+ @Test
+ public void testWindowing() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - File 3 - Parallel write for the first records (Watermark 360000)
+ // - Split 1 - 2 records (6, "file_3-recordTs_6"), (7,
"file_3-recordTs_7")
+ List<Record> batch =
+ ImmutableList.of(
+ generateRecord(100, "file_1-recordTs_100"),
+ generateRecord(101, "file_1-recordTs_101"),
+ generateRecord(103, "file_1-recordTs_103"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ // Generate records where the timestamps are out of order, but still
between 0-5 minutes
+ batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+ }
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ batch =
+ ImmutableList.of(
+ generateRecord(6, "file_3-recordTs_6"), generateRecord(7,
"file_3-recordTs_7"));
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ source(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withTimestampAssigner(new RowDataTimestampAssigner()),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+ stream
+ .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
+ .apply(
+ new AllWindowFunction<RowData, RowData, TimeWindow>() {
+ @Override
+ public void apply(
+ TimeWindow window, Iterable<RowData> values,
Collector<RowData> out) {
+ // Emit RowData which contains the window start time, and the
record count in
+ // that window
+ AtomicInteger count = new AtomicInteger(0);
+ values.forEach(a -> count.incrementAndGet());
+ out.collect(row(window.getStart(), count.get()));
+ windows.put(window.getStart(), count.get());
+ }
+ });
+
+ // Use static variable to collect the windows, since other solutions were
flaky
+ windows.clear();
+ env.executeAsync("Iceberg Source Windowing Test");
+
+ // Wait for the 2 first windows from File 2 and File 3
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () ->
+ windows.equals(
+ ImmutableMap.of(0L, RECORD_NUM_FOR_2_SPLITS,
TimeUnit.MINUTES.toMillis(5), 2)));
+
+ // Write data so the windows containing test data are closed
+ dataAppender.appendToTable(
+ dataAppender.writeFile(ImmutableList.of(generateRecord(1500,
"last-record"))));
+
+ // Wait for last test record window from File 1
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () ->
+ windows.equals(
+ ImmutableMap.of(
+ 0L,
+ RECORD_NUM_FOR_2_SPLITS,
+ TimeUnit.MINUTES.toMillis(5),
+ 2,
+ TimeUnit.MINUTES.toMillis(100),
+ 3)));
+ }
+
+ /**
+ * This is an integration test for watermark handling and throttling.
Integration testing the
+ * following:
+ *
+ * <ul>
+ * <li>- Emitting of watermarks
+ * <li>- Watermark alignment
+ * </ul>
+ *
+ * <p>The test generates 3 splits
+ *
+ * <ul>
+ * <li>- Split 1 - Watermark 100 min
+ * <li>- Split 2, 3 - Watermark 0 min
+ * </ul>
+ *
+ * The splits are read in the following order:
+ *
+ * <ul>
+ * <li>- Split 2, 3 (Task Manager 1, Task Manager 2)
+ * <li>- Split 1 (Task Manager 1 or ask Manager 2 depending on scheduling)
+ * </ul>
+ *
+ * Reading split 1 will cause the watermark alignment to pause reading for
the given task manager.
+ *
+ * <p>The status of the watermark alignment is checked by the alignment
related metrics.
+ *
+ * <p>Adding new records with old timestamps to the table will enable the
running reader to
+ * continue reading the files, but the watermark alignment will still
prevent the paused reader to
+ * continue.
+ *
+ * <p>After adding some records with new timestamps the blocked reader is
un-paused, and both ot
+ * the readers continue reading.
+ */
+ @Test
+ public void testThrottling() throws Exception {
+ GenericAppenderHelper dataAppender = appender();
+
+ // Generate records with the following pattern:
+ // - File 1 - Later records (Watermark 6000000)
+ // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103,
"file_1-recordTs_103")
+ // - File 2 - First records (Watermark 0)
+ // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1,
"file_2-recordTs_1"),...
+ List<Record> batch =
+ ImmutableList.of(
+ generateRecord(100, "file_1-recordTs_100"), generateRecord(103,
"file_1-recordTs_103"));
+ dataAppender.appendToTable(batch);
+
+ batch = Lists.newArrayListWithCapacity(100);
+ for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
+ batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
+ }
+
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<RowData> stream =
+ env.fromSource(
+ source(),
+ WatermarkStrategy.<RowData>noWatermarks()
+ .withWatermarkAlignment("iceberg", Duration.ofMinutes(20),
Duration.ofMillis(10)),
+ SOURCE_NAME,
+ TypeInformation.of(RowData.class));
+
+ try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
+ JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
+
+ // Check that the read the non-blocked data
+ // The first RECORD_NUM_FOR_2_SPLITS should be read
+ // 1 or more from the runaway reader should be arrived depending on
thread scheduling
+ waitForRecords(resultIterator, RECORD_NUM_FOR_2_SPLITS + 1);
+
+ // Get the drift metric, wait for it to be created and reach the
expected state
+ // (100 min - 20 min - 0 min)
+ // Also this validates that the WatermarkAlignment is working
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () ->
+ findAlignmentDriftMetric(jobClient.getJobID(),
TimeUnit.MINUTES.toMillis(80))
+ .isPresent());
+ Gauge<Long> drift =
+ findAlignmentDriftMetric(jobClient.getJobID(),
TimeUnit.MINUTES.toMillis(80)).get();
+
+ // Add some old records with 2 splits, so even if the blocked gets one
split, the other reader
+ // one gets one as well
+ List<Record> newBatch1 =
+ ImmutableList.of(
+ generateRecord(15, "file_3-recordTs_15"),
+ generateRecord(16, "file_3-recordTs_16"),
+ generateRecord(17, "file_3-recordTs_17"));
+ List<Record> newBatch2 =
+ ImmutableList.of(
+ generateRecord(15, "file_4-recordTs_15"),
+ generateRecord(16, "file_4-recordTs_16"),
+ generateRecord(17, "file_4-recordTs_17"));
+ dataAppender.appendToTable(
+ dataAppender.writeFile(newBatch1),
dataAppender.writeFile(newBatch2));
+ // The records received will highly depend on scheduling
+ // We minimally get 3 records from the non-blocked reader
+ // We might get 1 record from the blocked reader (as part of the
previous batch -
+ // file_1-recordTs_100)
+ // We might get 3 records form the non-blocked reader if it gets both
new splits
+ waitForRecords(resultIterator, 3);
+
+ // Get the drift metric, wait for it to be created and reach the
expected state (100 min - 20
+ // min - 15 min)
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));
+
+ // Add some new records which should unblock the throttled reader
+ batch =
+ ImmutableList.of(
+ generateRecord(90, "file_5-recordTs_90"), generateRecord(91,
"file_5-recordTs_91"));
+ dataAppender.appendToTable(batch);
+ // We should get all the records at this point
+ waitForRecords(resultIterator, 6);
+
+ // Wait for the new drift to decrease below the allowed drift to signal
the normal state
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(10))
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> drift.getValue() < TimeUnit.MINUTES.toMillis(20));
+ }
+ }
+
+ protected IcebergSource<RowData> source() {
+ return IcebergSource.<RowData>builder()
+ .tableLoader(sourceTableResource.tableLoader())
+ .watermarkColumn("ts")
+ .project(TestFixtures.TS_SCHEMA)
+ .splitSize(100L)
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(2))
+
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ }
+
+ protected Record generateRecord(int minutes, String str) {
+ // Override the ts field to create a more realistic situation for event
time alignment
+ Record record = GenericRecord.create(TestFixtures.TS_SCHEMA);
+ LocalDateTime ts =
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(Time.of(minutes,
TimeUnit.MINUTES).toMilliseconds()),
+ ZoneId.of("Z"));
+ record.setField("ts", ts);
+ record.setField("str", str);
+ return record;
+ }
+
+ protected void assertRecords(
+ Collection<Record> expectedRecords, CloseableIterator<RowData> iterator)
throws Exception {
+ Set<RowData> expected =
+ expectedRecords.stream()
+ .map(e -> RowDataConverter.convert(TestFixtures.TS_SCHEMA, e))
+ .collect(Collectors.toSet());
+ Assert.assertEquals(expected, waitForRecords(iterator,
expectedRecords.size()));
+ }
+
+ protected Set<RowData> waitForRecords(CloseableIterator<RowData> iterator,
int num) {
+ Set<RowData> received = Sets.newHashSetWithExpectedSize(num);
+ assertThat(
+ CompletableFuture.supplyAsync(
+ () -> {
+ int count = 0;
+ while (count < num && iterator.hasNext()) {
+ received.add(iterator.next());
+ count++;
+ }
+
+ if (count < num) {
+ throw new IllegalStateException(String.format("Fail to get
%d records.", num));
+ }
+
+ return true;
+ }))
+ .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+
+ return received;
+ }
+
+ private Optional<Gauge<Long>> findAlignmentDriftMetric(JobID jobID, long
withValue) {
+ String metricsName = SOURCE_NAME + ".*" +
MetricNames.WATERMARK_ALIGNMENT_DRIFT;
+ return reporter.findMetrics(jobID, metricsName).values().stream()
+ .map(m -> (Gauge<Long>) m)
+ .filter(m -> m.getValue() == withValue)
+ .findFirst();
+ }
+
+ private GenericAppenderHelper appender() {
+ // We need to create multiple splits, so we need to generate parquet files
with multiple offsets
+ org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
+ hadoopConf.set("write.parquet.page-size-bytes", "64");
+ hadoopConf.set("write.parquet.row-group-size-bytes", "64");
+ return new GenericAppenderHelper(
+ sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER,
hadoopConf);
+ }
+
+ private static RowData row(long time, long count) {
+ GenericRowData result = new GenericRowData(2);
+ result.setField(0, time);
+ result.setField(1, String.valueOf(count));
+ return result;
+ }
+
+ private static class RowDataTimestampAssigner implements
SerializableTimestampAssigner<RowData> {
+ @Override
+ public long extractTimestamp(RowData element, long recordTimestamp) {
+ return element.getTimestamp(0, 0).getMillisecond();
+ }
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
index f28677ca9d..090b304942 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/SplitAssignerTestBase.java
@@ -43,15 +43,13 @@ public abstract class SplitAssignerTestBase {
@Test
public void testStaticEnumeratorSequence() throws Exception {
SplitAssigner assigner = splitAssigner();
- assigner.onDiscoveredSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4,
1));
+ assigner.onDiscoveredSplits(createSplits(4, 1, "1"));
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertSnapshot(assigner, 1);
- assigner.onUnassignedSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1));
+ assigner.onUnassignedSplits(createSplits(1, 1, "1"));
assertSnapshot(assigner, 2);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -66,15 +64,12 @@ public abstract class SplitAssignerTestBase {
SplitAssigner assigner = splitAssigner();
assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
- List<IcebergSourceSplit> splits1 =
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ List<IcebergSourceSplit> splits1 = createSplits(1, 1, "1");
assertAvailableFuture(assigner, 1, () ->
assigner.onDiscoveredSplits(splits1));
- List<IcebergSourceSplit> splits2 =
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ List<IcebergSourceSplit> splits2 = createSplits(1, 1, "1");
assertAvailableFuture(assigner, 1, () ->
assigner.onUnassignedSplits(splits2));
- assigner.onDiscoveredSplits(
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 2,
1));
+ assigner.onDiscoveredSplits(createSplits(2, 1, "1"));
assertSnapshot(assigner, 2);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
@@ -125,5 +120,11 @@ public abstract class SplitAssignerTestBase {
Assert.assertEquals(splitCount, stateBeforeGet.size());
}
+ protected List<IcebergSourceSplit> createSplits(int fileCount, int
filesPerSplit, String version)
+ throws Exception {
+ return SplitHelpers.createSplitsFromTransientHadoopTable(
+ TEMPORARY_FOLDER, fileCount, filesPerSplit, version);
+ }
+
protected abstract SplitAssigner splitAssigner();
}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
index 8b9e132e0e..e78634e6b8 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestFileSequenceNumberBasedSplitAssigner.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.assigner;
import java.util.List;
import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
@@ -40,9 +39,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestB
public void testMultipleFilesInAnIcebergSplit() {
SplitAssigner assigner = splitAssigner();
Assertions.assertThatThrownBy(
- () ->
- assigner.onDiscoveredSplits(
-
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 4, 2, "2")),
+ () -> assigner.onDiscoveredSplits(createSplits(4, 2, "2")),
"Multiple files in a split is not allowed")
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Please use 'split-open-file-cost'");
@@ -52,8 +49,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestB
@Test
public void testSplitSort() throws Exception {
SplitAssigner assigner = splitAssigner();
- List<IcebergSourceSplit> splits =
- SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 5,
1, "2");
+ List<IcebergSourceSplit> splits = createSplits(5, 1, "2");
assigner.onDiscoveredSplits(splits.subList(3, 5));
assigner.onDiscoveredSplits(splits.subList(0, 1));
@@ -76,7 +72,7 @@ public class TestFileSequenceNumberBasedSplitAssigner extends
SplitAssignerTestB
Assert.assertNotNull(comparator);
}
- protected void assertGetNext(SplitAssigner assigner, Long
expectedSequenceNumber) {
+ private void assertGetNext(SplitAssigner assigner, Long
expectedSequenceNumber) {
GetSplitResult result = assigner.getNext(null);
ContentFile file = result.split().task().files().iterator().next().file();
Assert.assertEquals(expectedSequenceNumber, file.fileSequenceNumber());
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
new file mode 100644
index 0000000000..e1fc63fda9
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.assigner;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SerializableComparator;
+import org.apache.iceberg.flink.source.split.SplitComparators;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestWatermarkBasedSplitAssigner extends SplitAssignerTestBase {
+ public static final Schema SCHEMA =
+ new Schema(required(1, "timestamp_column",
Types.TimestampType.withoutZone()));
+ private static final GenericAppenderFactory APPENDER_FACTORY = new
GenericAppenderFactory(SCHEMA);
+
+ @Override
+ protected SplitAssigner splitAssigner() {
+ return new OrderedSplitAssignerFactory(
+ SplitComparators.watermark(
+ new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column",
null)))
+ .createAssigner();
+ }
+
+ /** Test the assigner when multiple files are in a single split */
+ @Test
+ public void testMultipleFilesInAnIcebergSplit() {
+ SplitAssigner assigner = splitAssigner();
+ assigner.onDiscoveredSplits(createSplits(4, 2, "2"));
+
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ }
+
+ /** Test sorted splits */
+ @Test
+ public void testSplitSort() {
+ SplitAssigner assigner = splitAssigner();
+
+ Instant now = Instant.now();
+ List<IcebergSourceSplit> splits =
+ IntStream.range(0, 5)
+ .mapToObj(i -> splitFromInstant(now.plus(i, ChronoUnit.MINUTES)))
+ .collect(Collectors.toList());
+
+ assigner.onDiscoveredSplits(splits.subList(3, 5));
+ assigner.onDiscoveredSplits(splits.subList(0, 1));
+ assigner.onDiscoveredSplits(splits.subList(1, 3));
+
+ assertGetNext(assigner, splits.get(0));
+ assertGetNext(assigner, splits.get(1));
+ assertGetNext(assigner, splits.get(2));
+ assertGetNext(assigner, splits.get(3));
+ assertGetNext(assigner, splits.get(4));
+
+ assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
+ }
+
+ @Test
+ public void testSerializable() {
+ byte[] bytes =
+ SerializationUtil.serializeToBytes(
+ SplitComparators.watermark(
+ new ColumnStatsWatermarkExtractor(
+ TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS)));
+ SerializableComparator<IcebergSourceSplit> comparator =
+ SerializationUtil.deserializeFromBytes(bytes);
+ Assert.assertNotNull(comparator);
+ }
+
+ private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split)
{
+ GetSplitResult result = assigner.getNext(null);
+ Assert.assertEquals(result.split(), split);
+ }
+
+ @Override
+ protected List<IcebergSourceSplit> createSplits(
+ int fileCount, int filesPerSplit, String version) {
+ return IntStream.range(0, fileCount / filesPerSplit)
+ .mapToObj(
+ splitNum ->
+ splitFromRecords(
+ IntStream.range(0, filesPerSplit)
+ .mapToObj(
+ fileNum ->
+ RandomGenericData.generate(
+ SCHEMA, 2, splitNum * filesPerSplit +
fileNum))
+ .collect(Collectors.toList())))
+ .collect(Collectors.toList());
+ }
+
+ private IcebergSourceSplit splitFromInstant(Instant instant) {
+ Record record = GenericRecord.create(SCHEMA);
+ record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
+ return splitFromRecords(ImmutableList.of(ImmutableList.of(record)));
+ }
+
+ private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
+ try {
+ return IcebergSourceSplit.fromCombinedScanTask(
+ ReaderUtil.createCombinedScanTask(
+ records, TEMPORARY_FOLDER, FileFormat.PARQUET,
APPENDER_FACTORY));
+ } catch (IOException e) {
+ throw new RuntimeException("Split creation exception", e);
+ }
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f9ceaf8422..2a2503ef24 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -60,9 +60,12 @@ public class ReaderUtil {
FileFormat fileFormat,
FileAppenderFactory<Record> appenderFactory)
throws IOException {
- try (FileAppender<Record> appender =
- appenderFactory.newAppender(Files.localOutput(file), fileFormat)) {
+ FileAppender<Record> appender =
+ appenderFactory.newAppender(Files.localOutput(file), fileFormat);
+ try {
appender.addAll(records);
+ } finally {
+ appender.close();
}
DataFile dataFile =
@@ -71,6 +74,7 @@ public class ReaderUtil {
.withFileSizeInBytes(file.length())
.withPath(file.toString())
.withFormat(fileFormat)
+ .withMetrics(appender.metrics())
.build();
ResidualEvaluator residuals =
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
new file mode 100644
index 0000000000..afe8a5d015
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestColumnStatsWatermarkExtractor {
+ public static final Schema SCHEMA =
+ new Schema(
+ required(1, "timestamp_column", Types.TimestampType.withoutZone()),
+ required(2, "timestamptz_column", Types.TimestampType.withZone()),
+ required(3, "long_column", Types.LongType.get()),
+ required(4, "string_column", Types.StringType.get()));
+
+ private static final GenericAppenderFactory APPENDER_FACTORY = new
GenericAppenderFactory(SCHEMA);
+
+ private static final List<List<Record>> TEST_RECORDS =
+ ImmutableList.of(
+ RandomGenericData.generate(SCHEMA, 3, 2L),
RandomGenericData.generate(SCHEMA, 3, 19L));
+
+ private static final List<Map<String, Long>> MIN_VALUES =
+ ImmutableList.of(Maps.newHashMapWithExpectedSize(3),
Maps.newHashMapWithExpectedSize(3));
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule
+ public final HadoopTableResource sourceTableResource =
+ new HadoopTableResource(TEMPORARY_FOLDER, TestFixtures.DATABASE,
TestFixtures.TABLE, SCHEMA);
+
+ private final String columnName;
+
+ @BeforeClass
+ public static void updateMinValue() {
+ for (int i = 0; i < TEST_RECORDS.size(); ++i) {
+ for (Record r : TEST_RECORDS.get(i)) {
+ Map<String, Long> minValues = MIN_VALUES.get(i);
+
+ LocalDateTime localDateTime = (LocalDateTime) r.get(0);
+ minValues.merge(
+ "timestamp_column",
localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), Math::min);
+
+ OffsetDateTime offsetDateTime = (OffsetDateTime) r.get(1);
+ minValues.merge("timestamptz_column",
offsetDateTime.toInstant().toEpochMilli(), Math::min);
+
+ minValues.merge("long_column", (Long) r.get(2), Math::min);
+ }
+ }
+ }
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> data() {
+ return ImmutableList.of(
+ new Object[] {"timestamp_column"},
+ new Object[] {"timestamptz_column"},
+ new Object[] {"long_column"});
+ }
+
+ public TestColumnStatsWatermarkExtractor(String columnName) {
+ this.columnName = columnName;
+ }
+
+ @Test
+ public void testSingle() throws IOException {
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
+ }
+
+ @Test
+ public void testTimeUnit() throws IOException {
+ Assume.assumeTrue("Run only for long column",
columnName.equals("long_column"));
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName,
TimeUnit.MICROSECONDS);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue() / 1000L,
+ extractor.extractWatermark(split(0)));
+ }
+
+ @Test
+ public void testMultipleFiles() throws IOException {
+ Assume.assumeTrue("Run only for the timestamp column",
columnName.equals("timestamp_column"));
+ IcebergSourceSplit combinedSplit =
+ IcebergSourceSplit.fromCombinedScanTask(
+ ReaderUtil.createCombinedScanTask(
+ TEST_RECORDS, TEMPORARY_FOLDER, FileFormat.PARQUET,
APPENDER_FACTORY));
+
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null);
+
+ Assert.assertEquals(
+ MIN_VALUES.get(0).get(columnName).longValue(),
extractor.extractWatermark(split(0)));
+ Assert.assertEquals(
+ MIN_VALUES.get(1).get(columnName).longValue(),
extractor.extractWatermark(split(1)));
+ Assert.assertEquals(
+ Math.min(MIN_VALUES.get(0).get(columnName),
MIN_VALUES.get(1).get(columnName)),
+ extractor.extractWatermark(combinedSplit));
+ }
+
+ @Test
+ public void testWrongColumn() {
+ Assume.assumeTrue("Run only for string column",
columnName.equals("string_column"));
+ Assertions.assertThatThrownBy(() -> new
ColumnStatsWatermarkExtractor(SCHEMA, columnName, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Found STRING, expected a LONG or TIMESTAMP column for watermark
generation.");
+ }
+
+ @Test
+ public void testEmptyStatistics() throws IOException {
+ Assume.assumeTrue("Run only for timestamp column",
columnName.equals("timestamp_column"));
+
+ // Create an extractor for a column we do not have statistics
+ ColumnStatsWatermarkExtractor extractor =
+ new ColumnStatsWatermarkExtractor(10, "missing_field");
+ Assertions.assertThatThrownBy(() -> extractor.extractWatermark(split(0)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Missing statistics for column");
+ }
+
+ private IcebergSourceSplit split(int id) throws IOException {
+ return IcebergSourceSplit.fromCombinedScanTask(
+ ReaderUtil.createCombinedScanTask(
+ ImmutableList.of(TEST_RECORDS.get(id)),
+ TEMPORARY_FOLDER,
+ FileFormat.PARQUET,
+ APPENDER_FACTORY));
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index def4f43685..88234c6112 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -167,7 +167,12 @@ public class TestIcebergSourceReader {
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager(),
Collections.emptyList());
- return new IcebergSourceReader<>(readerMetrics, readerFunction,
splitComparator, readerContext);
+ return new IcebergSourceReader<>(
+ SerializableRecordEmitter.defaultEmitter(),
+ readerMetrics,
+ readerFunction,
+ splitComparator,
+ readerContext);
}
private static class IdBasedComparator implements
SerializableComparator<IcebergSourceSplit> {