This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8c7001eb46 Flink: Backport #9308 to v1.17 and the relevant parts to
v1.16 (#9403)
8c7001eb46 is described below
commit 8c7001eb462a69641b3f0546b7efeddb63e501be
Author: pvary <[email protected]>
AuthorDate: Wed Jan 3 17:53:34 2024 +0100
Flink: Backport #9308 to v1.17 and the relevant parts to v1.16 (#9403)
---
.../source/reader/IcebergSourceSplitReader.java | 13 ++++++++++--
.../TestIcebergSourceWithWatermarkExtractor.java | 2 +-
.../source/reader/IcebergSourceSplitReader.java | 24 ++++++++++++++++++++--
.../TestIcebergSourceWithWatermarkExtractor.java | 7 ++-----
4 files changed, 36 insertions(+), 10 deletions(-)
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index 4e270dfa3d..7ee75d2ade 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.reader;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
@@ -35,6 +34,7 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,9 +60,18 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
this.openSplitFunction = openSplitFunction;
this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
- this.splits = new ArrayDeque<>();
+ this.splits = Queues.newArrayDeque();
}
+ /**
+ * The method reads a batch of records from the assigned splits. If all the
records from the
+ * current split are returned then it will emit a {@link
ArrayBatchRecords#finishedSplit(String)}
+ * batch to signal this event. In the next fetch loop the reader will
continue with the next split
+ * (if any).
+ *
+ * @return The fetched records
+ * @throws IOException If there is an error during reading
+ */
@Override
public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
metrics.incrementSplitReaderFetchCalls(1);
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index 7428d30f42..2ef8f79aa3 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -380,7 +380,7 @@ public class TestIcebergSourceWithWatermarkExtractor
implements Serializable {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
- .monitorInterval(Duration.ofMillis(2))
+ .monitorInterval(Duration.ofMillis(10))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index 4e270dfa3d..9c20494fdb 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -20,7 +20,7 @@ package org.apache.iceberg.flink.source.reader;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
@@ -35,6 +35,7 @@ import
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,9 +61,18 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
this.openSplitFunction = openSplitFunction;
this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
- this.splits = new ArrayDeque<>();
+ this.splits = Queues.newArrayDeque();
}
+ /**
+ * The method reads a batch of records from the assigned splits. If all the
records from the
+ * current split are returned then it will emit a {@link
ArrayBatchRecords#finishedSplit(String)}
+ * batch to signal this event. In the next fetch loop the reader will
continue with the next split
+ * (if any).
+ *
+ * @return The fetched records
+ * @throws IOException If there is an error during reading
+ */
@Override
public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
metrics.incrementSplitReaderFetchCalls(1);
@@ -123,6 +133,16 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
}
}
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<IcebergSourceSplit> splitsToPause,
Collection<IcebergSourceSplit> splitsToResume) {
+ // IcebergSourceSplitReader only reads splits sequentially. When waiting
for watermark alignment
+ // the SourceOperator will stop processing and recycling the fetched
batches. This exhausts the
+ // {@link ArrayPoolDataIteratorBatcher#pool} and the
`currentReader.next()` call will be
+ // blocked even without split-level watermark alignment. Based on this the
+ // `pauseOrResumeSplits` and the `wakeUp` are left empty.
+ }
+
private long calculateBytes(IcebergSourceSplit split) {
return split.task().files().stream().map(FileScanTask::length).reduce(0L,
Long::sum);
}
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index c4070212df..2ef8f79aa3 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -40,7 +40,6 @@ import
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
@@ -95,9 +94,7 @@ public class TestIcebergSourceWithWatermarkExtractor
implements Serializable {
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .setConfiguration(
- reporter.addToConfiguration(
- new
Configuration().set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true)))
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
.withHaLeadershipControl()
.build());
@@ -383,7 +380,7 @@ public class TestIcebergSourceWithWatermarkExtractor
implements Serializable {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
- .monitorInterval(Duration.ofMillis(2))
+ .monitorInterval(Duration.ofMillis(10))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
}