This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c7001eb46 Flink: Backport #9308 to v1.17 and the relevant parts to 
v1.16 (#9403)
8c7001eb46 is described below

commit 8c7001eb462a69641b3f0546b7efeddb63e501be
Author: pvary <[email protected]>
AuthorDate: Wed Jan 3 17:53:34 2024 +0100

    Flink: Backport #9308 to v1.17 and the relevant parts to v1.16 (#9403)
---
 .../source/reader/IcebergSourceSplitReader.java    | 13 ++++++++++--
 .../TestIcebergSourceWithWatermarkExtractor.java   |  2 +-
 .../source/reader/IcebergSourceSplitReader.java    | 24 ++++++++++++++++++++--
 .../TestIcebergSourceWithWatermarkExtractor.java   |  7 ++-----
 4 files changed, 36 insertions(+), 10 deletions(-)

diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index 4e270dfa3d..7ee75d2ade 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.reader;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
@@ -35,6 +34,7 @@ import 
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,9 +60,18 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
     this.openSplitFunction = openSplitFunction;
     this.splitComparator = splitComparator;
     this.indexOfSubtask = context.getIndexOfSubtask();
-    this.splits = new ArrayDeque<>();
+    this.splits = Queues.newArrayDeque();
   }
 
+  /**
+   * The method reads a batch of records from the assigned splits. If all the 
records from the
+   * current split are returned then it will emit a {@link 
ArrayBatchRecords#finishedSplit(String)}
+   * batch to signal this event. In the next fetch loop the reader will 
continue with the next split
+   * (if any).
+   *
+   * @return The fetched records
+   * @throws IOException If there is an error during reading
+   */
   @Override
   public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
     metrics.incrementSplitReaderFetchCalls(1);
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index 7428d30f42..2ef8f79aa3 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -380,7 +380,7 @@ public class TestIcebergSourceWithWatermarkExtractor 
implements Serializable {
         .project(TestFixtures.TS_SCHEMA)
         .splitSize(100L)
         .streaming(true)
-        .monitorInterval(Duration.ofMillis(2))
+        .monitorInterval(Duration.ofMillis(10))
         
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
         .build();
   }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index 4e270dfa3d..9c20494fdb 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -20,7 +20,7 @@ package org.apache.iceberg.flink.source.reader;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
@@ -35,6 +35,7 @@ import 
org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.SerializableComparator;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,9 +61,18 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
     this.openSplitFunction = openSplitFunction;
     this.splitComparator = splitComparator;
     this.indexOfSubtask = context.getIndexOfSubtask();
-    this.splits = new ArrayDeque<>();
+    this.splits = Queues.newArrayDeque();
   }
 
+  /**
+   * The method reads a batch of records from the assigned splits. If all the 
records from the
+   * current split are returned then it will emit a {@link 
ArrayBatchRecords#finishedSplit(String)}
+   * batch to signal this event. In the next fetch loop the reader will 
continue with the next split
+   * (if any).
+   *
+   * @return The fetched records
+   * @throws IOException If there is an error during reading
+   */
   @Override
   public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
     metrics.incrementSplitReaderFetchCalls(1);
@@ -123,6 +133,16 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
     }
   }
 
+  @Override
+  public void pauseOrResumeSplits(
+      Collection<IcebergSourceSplit> splitsToPause, 
Collection<IcebergSourceSplit> splitsToResume) {
+    // IcebergSourceSplitReader only reads splits sequentially. When waiting 
for watermark alignment
+    // the SourceOperator will stop processing and recycling the fetched 
batches. This exhausts the
+    // {@link ArrayPoolDataIteratorBatcher#pool} and the 
`currentReader.next()` call will be
+    // blocked even without split-level watermark alignment. Based on this the
+    // `pauseOrResumeSplits` and the `wakeUp` are left empty.
+  }
+
   private long calculateBytes(IcebergSourceSplit split) {
     return split.task().files().stream().map(FileScanTask::length).reduce(0L, 
Long::sum);
   }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
index c4070212df..2ef8f79aa3 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.metrics.MetricNames;
@@ -95,9 +94,7 @@ public class TestIcebergSourceWithWatermarkExtractor 
implements Serializable {
               .setNumberTaskManagers(1)
               .setNumberSlotsPerTaskManager(PARALLELISM)
               .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-              .setConfiguration(
-                  reporter.addToConfiguration(
-                      new 
Configuration().set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true)))
+              .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
               .withHaLeadershipControl()
               .build());
 
@@ -383,7 +380,7 @@ public class TestIcebergSourceWithWatermarkExtractor 
implements Serializable {
         .project(TestFixtures.TS_SCHEMA)
         .splitSize(100L)
         .streaming(true)
-        .monitorInterval(Duration.ofMillis(2))
+        .monitorInterval(Duration.ofMillis(10))
         
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
         .build();
   }

Reply via email to