This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 69df0c4313 [core][flink] Streaming first plan refactor level0 
filtering to avoid that full mode wrong (#5866)
69df0c4313 is described below

commit 69df0c4313bf1619577ba5e2ae59bee7c4adca07
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Thu Jul 17 13:14:34 2025 +0800

    [core][flink] Streaming first plan refactor level0 filtering to avoid that 
full mode wrong (#5866)
---
 .../paimon/table/source/DataTableStreamScan.java   | 16 ++---
 .../apache/paimon/flink/DeletionVectorITCase.java  | 75 ++++++++++++++++------
 2 files changed, 61 insertions(+), 30 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index a6ab782dba..25563ded2b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.StreamScanMode;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.Consumer;
-import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
@@ -50,6 +49,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;
 
 /** {@link StreamTableScan} implementation for streaming planning. */
@@ -157,7 +157,8 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
         StartingScanner.Result result;
         if (scanMode == FILE_MONITOR) {
             result = startingScanner.scan(snapshotReader);
-        } else if (options.needLookup()) {
+        } else if (options.changelogProducer().equals(LOOKUP)) {
+            // level0 data will be compacted to produce changelog in the future
             result = startingScanner.scan(snapshotReader.withLevelFilter(level 
-> level > 0));
             snapshotReader.withLevelFilter(Filter.alwaysTrue());
         } else if (options.changelogProducer().equals(FULL_COMPACTION)) {
@@ -174,16 +175,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
             ScannedResult scannedResult = (ScannedResult) result;
             currentWatermark = scannedResult.currentWatermark();
             long currentSnapshotId = scannedResult.currentSnapshotId();
-            LookupStrategy lookupStrategy = options.lookupStrategy();
-            if (scanMode == FILE_MONITOR) {
-                nextSnapshotId = currentSnapshotId + 1;
-            } else if (!lookupStrategy.produceChangelog && 
lookupStrategy.deletionVector) {
-                // For DELETION_VECTOR_ONLY mode, we need to return the 
remaining data from level 0
-                // in the subsequent plan.
-                nextSnapshotId = currentSnapshotId;
-            } else {
-                nextSnapshotId = currentSnapshotId + 1;
-            }
+            nextSnapshotId = currentSnapshotId + 1;
             isFullPhaseEnd =
                     
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
             LOG.debug(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index a5b1795037..34200cbc41 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -70,14 +70,14 @@ public class DeletionVectorITCase extends CatalogITCaseBase 
{
         try (BlockingIterator<Row, Row> iter =
                 streamSqlBlockIter(
                         "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
-            assertThat(iter.collect(8))
+
+            // the first two values will be merged
+            assertThat(iter.collect(6))
                     .containsExactlyInAnyOrder(
                             Row.ofKind(RowKind.INSERT, 1, "111111111"),
-                            Row.ofKind(RowKind.INSERT, 2, "2"),
-                            Row.ofKind(RowKind.INSERT, 3, "3"),
-                            Row.ofKind(RowKind.INSERT, 4, "4"),
                             Row.ofKind(RowKind.INSERT, 2, "2_1"),
                             Row.ofKind(RowKind.INSERT, 3, "3_1"),
+                            Row.ofKind(RowKind.INSERT, 4, "4"),
                             Row.ofKind(RowKind.INSERT, 2, "2_2"),
                             Row.ofKind(RowKind.INSERT, 4, "4_1"));
         }
@@ -118,20 +118,34 @@ public class DeletionVectorITCase extends 
CatalogITCaseBase {
         try (BlockingIterator<Row, Row> iter =
                 streamSqlBlockIter(
                         "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
-            assertThat(iter.collect(12))
-                    .containsExactlyInAnyOrder(
-                            Row.ofKind(RowKind.INSERT, 1, "111111111"),
-                            Row.ofKind(RowKind.INSERT, 2, "2"),
-                            Row.ofKind(RowKind.INSERT, 3, "3"),
-                            Row.ofKind(RowKind.INSERT, 4, "4"),
-                            Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"),
-                            Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"),
-                            Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"),
-                            Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"),
-                            Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"),
-                            Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"),
-                            Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"),
-                            Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1"));
+            if (changelogProducer.equals("none")) {
+                // the first two values will be merged
+                assertThat(iter.collect(8))
+                        .containsExactlyInAnyOrder(
+                                Row.ofKind(RowKind.INSERT, 1, "111111111"),
+                                Row.ofKind(RowKind.INSERT, 2, "2_1"),
+                                Row.ofKind(RowKind.INSERT, 3, "3_1"),
+                                Row.ofKind(RowKind.INSERT, 4, "4"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1"));
+            } else {
+                assertThat(iter.collect(12))
+                        .containsExactlyInAnyOrder(
+                                Row.ofKind(RowKind.INSERT, 1, "111111111"),
+                                Row.ofKind(RowKind.INSERT, 2, "2"),
+                                Row.ofKind(RowKind.INSERT, 3, "3"),
+                                Row.ofKind(RowKind.INSERT, 4, "4"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 3, "3"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 3, "3_1"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2_1"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_2"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 4, "4"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 4, "4_1"));
+            }
         }
 
         // test read from COMPACT snapshot
@@ -380,4 +394,29 @@ public class DeletionVectorITCase extends 
CatalogITCaseBase {
         sql("ALTER TABLE TT SET('deletion-vectors.enabled' = 'false')");
         assertThat(sql("SELECT * FROM TT").size()).isEqualTo(5);
     }
+
+    // No compaction to verify that level0 data can be read at full phase
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testStreamingReadFullWithoutCompact(boolean isPk) throws 
Exception {
+        if (isPk) {
+            sql(
+                    "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) "
+                            + "WITH ('deletion-vectors.enabled' = 'true', 
'changelog-producer' = 'none', 'write-only' = 'true')");
+        } else {
+            sql(
+                    "CREATE TABLE T (a INT, b INT) WITH 
('deletion-vectors.enabled' = 'true', 'write-only' = 'true')");
+        }
+
+        sql("INSERT INTO T VALUES (1, 1)");
+        sql("INSERT INTO T VALUES (2, 2)");
+        sql("INSERT INTO T VALUES (3, 3)");
+
+        try (BlockingIterator<Row, Row> iter =
+                streamSqlBlockIter(
+                        "SELECT * FROM T /*+ OPTIONS('scan.mode' = 
'from-snapshot-full', 'scan.snapshot-id' = '2') */")) {
+            assertThat(iter.collect(3))
+                    .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2), 
Row.of(3, 3));
+        }
+    }
 }

Reply via email to