This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 69df0c4313 [core][flink] Streaming first plan refactor level0 filtering to avoid that full mode wrong (#5866) 69df0c4313 is described below commit 69df0c4313bf1619577ba5e2ae59bee7c4adca07 Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Thu Jul 17 13:14:34 2025 +0800 [core][flink] Streaming first plan refactor level0 filtering to avoid that full mode wrong (#5866) --- .../paimon/table/source/DataTableStreamScan.java | 16 ++--- .../apache/paimon/flink/DeletionVectorITCase.java | 75 ++++++++++++++++------ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index a6ab782dba..25563ded2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.StreamScanMode; import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.Consumer; -import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.TableSchema; @@ -50,6 +49,7 @@ import javax.annotation.Nullable; import java.util.List; import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION; +import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP; import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR; /** {@link StreamTableScan} implementation for streaming planning. */ @@ -157,7 +157,8 @@ public class DataTableStreamScan extends AbstractDataTableScan implements Stream StartingScanner.Result result; if (scanMode == FILE_MONITOR) { result = startingScanner.scan(snapshotReader); - } else if (options.needLookup()) { + } else if (options.changelogProducer().equals(LOOKUP)) { + // level0 data will be compacted to produce changelog in the future result = startingScanner.scan(snapshotReader.withLevelFilter(level -> level > 0)); snapshotReader.withLevelFilter(Filter.alwaysTrue()); } else if (options.changelogProducer().equals(FULL_COMPACTION)) { @@ -174,16 +175,7 @@ public class DataTableStreamScan extends AbstractDataTableScan implements Stream ScannedResult scannedResult = (ScannedResult) result; currentWatermark = scannedResult.currentWatermark(); long currentSnapshotId = scannedResult.currentSnapshotId(); - LookupStrategy lookupStrategy = options.lookupStrategy(); - if (scanMode == FILE_MONITOR) { - nextSnapshotId = currentSnapshotId + 1; - } else if (!lookupStrategy.produceChangelog && lookupStrategy.deletionVector) { - // For DELETION_VECTOR_ONLY mode, we need to return the remaining data from level 0 - // in the subsequent plan. - nextSnapshotId = currentSnapshotId; - } else { - nextSnapshotId = currentSnapshotId + 1; - } + nextSnapshotId = currentSnapshotId + 1; isFullPhaseEnd = boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId)); LOG.debug( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index a5b1795037..34200cbc41 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -70,14 +70,14 @@ public class DeletionVectorITCase extends CatalogITCaseBase { try (BlockingIterator<Row, Row> iter = streamSqlBlockIter( "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) { - assertThat(iter.collect(8)) + + // the first two values will be merged + assertThat(iter.collect(6)) .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, 1, "111111111"), - Row.ofKind(RowKind.INSERT, 2, "2"), - Row.ofKind(RowKind.INSERT, 3, "3"), - Row.ofKind(RowKind.INSERT, 4, "4"), Row.ofKind(RowKind.INSERT, 2, "2_1"), Row.ofKind(RowKind.INSERT, 3, "3_1"), + Row.ofKind(RowKind.INSERT, 4, "4"), Row.ofKind(RowKind.INSERT, 2, "2_2"), Row.ofKind(RowKind.INSERT, 4, "4_1")); } @@ -118,20 +118,34 @@ public class DeletionVectorITCase extends CatalogITCaseBase { try (BlockingIterator<Row, Row> iter = streamSqlBlockIter( "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) { - assertThat(iter.collect(12)) - .containsExactlyInAnyOrder( - Row.ofKind(RowKind.INSERT, 1, "111111111"), - Row.ofKind(RowKind.INSERT, 2, "2"), - Row.ofKind(RowKind.INSERT, 3, "3"), - Row.ofKind(RowKind.INSERT, 4, "4"), - Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"), - Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"), - Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"), - Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"), - Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"), - Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"), - Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"), - Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1")); + if (changelogProducer.equals("none")) { + // the first two values will be merged + assertThat(iter.collect(8)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111"), + Row.ofKind(RowKind.INSERT, 2, "2_1"), + Row.ofKind(RowKind.INSERT, 3, "3_1"), + Row.ofKind(RowKind.INSERT, 4, "4"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"), + Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"), + Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1")); + } else { + assertThat(iter.collect(12)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111"), + Row.ofKind(RowKind.INSERT, 2, "2"), + Row.ofKind(RowKind.INSERT, 3, "3"), + Row.ofKind(RowKind.INSERT, 4, "4"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"), + Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"), + Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"), + Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"), + Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"), + Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1")); + } } // test read from COMPACT snapshot @@ -380,4 +394,29 @@ public class DeletionVectorITCase extends CatalogITCaseBase { sql("ALTER TABLE TT SET('deletion-vectors.enabled' = 'false')"); assertThat(sql("SELECT * FROM TT").size()).isEqualTo(5); } + + // No compaction to verify that level0 data can be read at full phase + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStreamingReadFullWithoutCompact(boolean isPk) throws Exception { + if (isPk) { + sql( + "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = 'none', 'write-only' = 'true')"); + } else { + sql( + "CREATE TABLE T (a INT, b INT) WITH ('deletion-vectors.enabled' = 'true', 'write-only' = 'true')"); + } + + sql("INSERT INTO T VALUES (1, 1)"); + sql("INSERT INTO T VALUES (2, 2)"); + sql("INSERT INTO T VALUES (3, 3)"); + + try (BlockingIterator<Row, Row> iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '2') */")) { + assertThat(iter.collect(3)) + .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2), Row.of(3, 3)); + } + } }