This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b78d336163 Flink: use correct scan mode when in
TABLE_SCAN_THEN_INCREMENTAL mode (#7338)
b78d336163 is described below
commit b78d3361635e924ed23fd8fe87b5f966b1953dd9
Author: Chen, Junjie <[email protected]>
AuthorDate: Sun Apr 16 11:58:29 2023 +0800
Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode
(#7338)
---
.../iceberg/flink/source/FlinkSplitPlanner.java | 10 +++--
.../enumerator/ContinuousSplitPlannerImpl.java | 4 +-
.../flink/source/TestIcebergSourceContinuous.java | 51 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 5 deletions(-)
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 38a55e437d..ea317e93d8 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
@@ -136,14 +137,15 @@ public class FlinkSplitPlanner {
}
}
- private enum ScanMode {
+ @VisibleForTesting
+ enum ScanMode {
BATCH,
INCREMENTAL_APPEND_SCAN
}
- private static ScanMode checkScanMode(ScanContext context) {
- if (context.isStreaming()
- || context.startSnapshotId() != null
+ @VisibleForTesting
+ static ScanMode checkScanMode(ScanContext context) {
+ if (context.startSnapshotId() != null
|| context.endSnapshotId() != null
|| context.startTag() != null
|| context.endTag() != null) {
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 85ecaaee1f..f0d8ca8d70 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -166,7 +166,9 @@ public class ContinuousSplitPlannerImpl implements
ContinuousSplitPlanner {
if (scanContext.streamingStartingStrategy()
== StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
// do a batch table scan first
- splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext,
workerPool);
+ splits =
+ FlinkSplitPlanner.planIcebergSourceSplits(
+ table,
scanContext.copyWithSnapshotId(startSnapshot.snapshotId()), workerPool);
LOG.info(
"Discovered {} splits from initial batch table scan with snapshot Id
{}",
splits.size(),
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 582a125233..6d26f933b3 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -111,6 +112,56 @@ public class TestIcebergSourceContinuous {
}
}
+ @Test
+ public void testTableScanThenIncrementalAfterExpiration() throws Exception {
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET,
TEMPORARY_FOLDER);
+
+ // snapshot1
+ List<Record> batch1 =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+ long snapshotId = tableResource.table().currentSnapshot().snapshotId();
+
+ // snapshot2
+ List<Record> batch2 =
+ RandomGenericData.generate(tableResource.table().schema(), 2,
randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+
+
tableResource.table().expireSnapshots().expireSnapshotId(snapshotId).commit();
+
+ Assert.assertEquals(1, tableResource.table().history().size());
+
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+
+ Assert.assertEquals(
+ FlinkSplitPlanner.ScanMode.BATCH,
FlinkSplitPlanner.checkScanMode(scanContext));
+
+ try (CloseableIterator<Row> iter =
+
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> result1 = waitForResult(iter, 4);
+ List<Record> initialRecords = Lists.newArrayList();
+ initialRecords.addAll(batch1);
+ initialRecords.addAll(batch2);
+ TestHelpers.assertRecords(result1, initialRecords,
tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+ tableResource.table().currentSnapshot().snapshotId();
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+ }
+ }
+
@Test
public void testEarliestSnapshot() throws Exception {
GenericAppenderHelper dataAppender =