This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 55957987e7430fdef81bdbd85f8b5db8313a839d Author: JunZhang <[email protected]> AuthorDate: Fri Dec 29 18:37:43 2023 +0800 [core] fix streaming read empty table in latest mode (#2589) Co-authored-by: yuzelin <[email protected]> --- .../snapshot/ContinuousLatestStartingScanner.java | 12 +++++++++--- .../apache/paimon/flink/ContinuousFileStoreITCase.java | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java index c1a6054ae..96c3e700f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -40,11 +41,16 @@ public class ContinuousLatestStartingScanner extends AbstractStartingScanner { @Override public Result scan(SnapshotReader snapshotReader) { - Long startingSnapshotId = snapshotManager.latestSnapshotId(); - if (startingSnapshotId == null) { + Long latestSnapshotId = snapshotManager.latestSnapshotId(); + if (latestSnapshotId == null) { LOG.debug("There is currently no snapshot. Wait for the snapshot generation."); return new NoSnapshot(); } - return new NextSnapshot(startingSnapshotId + 1); + + // If there's no snapshot before the reading job starts, + // then the first snapshot should be considered as an incremental snapshot + long nextSnapshot = + startingSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1; + return new NextSnapshot(nextSnapshot); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index eda44b0bc..767f2bfe6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -216,6 +216,23 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase { iterator.close(); } + @Test + public void testContinuousLatestStartingFromEmpty() throws Exception { + BlockingIterator<Row, Row> iterator = + BlockingIterator.of( + streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest') */")); + + sql("INSERT INTO T1 VALUES ('1', 'Hello', 'World')"); + sql("INSERT INTO T1 VALUES ('2', 'Apache', 'Paimon')"); + sql("INSERT INTO T1 VALUES ('3', 'C', 'c')"); + + assertThat(iterator.collect(3)) + .containsExactlyInAnyOrder( + Row.of("1", "Hello", "World"), + Row.of("2", "Apache", "Paimon"), + Row.of("3", "C", "c")); + } + @Test public void testContinuousFromTimestamp() throws Exception { String sql =
