This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f353ca0e4 [core] Compacted StartingScanner is incorrect (#857)
f353ca0e4 is described below

commit f353ca0e45d206342365f2a0e47619c8b1d4b095
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 11 10:41:37 2023 +0800

    [core] Compacted StartingScanner is incorrect (#857)
---
 .../source/snapshot/CompactedStartingScanner.java  | 19 ++++++--
 .../snapshot/FullCompactedStartingScanner.java     | 22 ++-------
 .../org/apache/paimon/utils/SnapshotManager.java   |  4 +-
 .../snapshot/CompactedStartingScannerTest.java     |  4 +-
 ....java => FullCompactedStartingScannerTest.java} | 54 +++++++++++-----------
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  7 ---
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  7 ---
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  7 ---
 8 files changed, 51 insertions(+), 73 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index 63f5f7578..adb9d2fee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -35,11 +35,19 @@ public class CompactedStartingScanner implements 
StartingScanner {
     @Override
     @Nullable
     public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
-        Long startingSnapshotId = snapshotManager.latestCompactedSnapshotId();
+        Long startingSnapshotId = pick(snapshotManager);
         if (startingSnapshotId == null) {
-            LOG.debug("There is currently no compact snapshot. Waiting for 
snapshot generation.");
-            return null;
+            startingSnapshotId = snapshotManager.latestSnapshotId();
+            if (startingSnapshotId == null) {
+                LOG.debug("There is currently no snapshot. Wait for the 
snapshot generation.");
+                return null;
+            } else {
+                LOG.debug(
+                        "No compact snapshot found, reading from the latest 
snapshot {}.",
+                        startingSnapshotId);
+            }
         }
+
         return new Result(
                 startingSnapshotId,
                 snapshotSplitReader
@@ -47,4 +55,9 @@ public class CompactedStartingScanner implements 
StartingScanner {
                         .withSnapshot(startingSnapshotId)
                         .splits());
     }
+
+    @Nullable
+    protected Long pick(SnapshotManager snapshotManager) {
+        return snapshotManager.latestCompactedSnapshotId();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
index bb7b38b4b..46b2f2453 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -21,21 +21,15 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions.StartupMode;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
-import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 /**
  * {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup 
mode with
  * 'full-compaction.delta-commits'.
  */
-public class FullCompactedStartingScanner implements StartingScanner {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
+public class FullCompactedStartingScanner extends CompactedStartingScanner {
 
     private final int deltaCommits;
 
@@ -45,18 +39,8 @@ public class FullCompactedStartingScanner implements 
StartingScanner {
 
     @Override
     @Nullable
-    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader 
snapshotSplitReader) {
-        Long startingSnapshotId = snapshotManager.pickSnapshot(this::picked);
-        if (startingSnapshotId == null) {
-            LOG.debug("There is currently no compact snapshot. Waiting for 
snapshot generation.");
-            return null;
-        }
-        return new Result(
-                startingSnapshotId,
-                snapshotSplitReader
-                        .withKind(ScanKind.ALL)
-                        .withSnapshot(startingSnapshotId)
-                        .splits());
+    protected Long pick(SnapshotManager snapshotManager) {
+        return snapshotManager.pickFromLatest(this::picked);
     }
 
     private boolean picked(Snapshot snapshot) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 1ef5d8a26..077fe3cd0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -100,10 +100,10 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Long latestCompactedSnapshotId() {
-        return pickSnapshot(s -> s.commitKind() == CommitKind.COMPACT);
+        return pickFromLatest(s -> s.commitKind() == CommitKind.COMPACT);
     }
 
-    public @Nullable Long pickSnapshot(Predicate<Snapshot> predicate) {
+    public @Nullable Long pickFromLatest(Predicate<Snapshot> predicate) {
         Long latestId = latestSnapshotId();
         Long earliestId = earliestSnapshotId();
         if (latestId == null || earliestId == null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 42ec5dc76..01e40cfea 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -86,7 +86,9 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
 
         CompactedStartingScanner scanner = new CompactedStartingScanner();
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+
+        // No compact snapshot found, reading from the latest snapshot
+        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader).snapshotId()).isEqualTo(1);
 
         write.close();
         commit.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
similarity index 78%
copy from 
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
copy to 
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
index 42ec5dc76..ba180684e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
@@ -25,12 +25,10 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link CompactedStartingScanner}. */
-public class CompactedStartingScannerTest extends ScannerTestBase {
+/** Tests for {@link FullCompactedStartingScanner}. */
+public class FullCompactedStartingScannerTest extends ScannerTestBase {
 
     @Test
     public void testScan() throws Exception {
@@ -38,28 +36,18 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
 
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(1, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
+        for (int i = 0; i < 5; i++) {
+            write.write(rowData(1, 10, 102L));
+            write.write(rowData(1, 20, 201L));
+            write.compact(binaryRow(1), 0, true);
+            commit.commit(i, write.prepareCommit(true, i));
+        }
 
-        write.write(rowData(1, 10, 101L));
-        write.write(rowData(1, 30, 300L));
-        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(1, write.prepareCommit(true, 1));
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);
 
-        write.write(rowData(1, 10, 102L));
-        write.write(rowData(1, 20, 201L));
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
-
-        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
         StartingScanner.Result result = scanner.scan(snapshotManager, 
snapshotSplitReader);
-        assertThat(result.snapshotId()).isEqualTo(3);
-        assertThat(getResult(table.newRead(), toSplits(result.splits())))
-                .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", 
"+I 1|30|300"));
+        assertThat(result.snapshotId()).isEqualTo(8);
 
         write.close();
         commit.close();
@@ -68,7 +56,7 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
     @Test
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
-        CompactedStartingScanner scanner = new CompactedStartingScanner();
+        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
         assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
     }
 
@@ -83,10 +71,22 @@ public class CompactedStartingScannerTest extends 
ScannerTestBase {
         write.write(rowData(1, 40, 400L));
         commit.commit(0, write.prepareCommit(true, 0));
 
-        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
 
-        CompactedStartingScanner scanner = new CompactedStartingScanner();
-        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader)).isNull();
+        write.write(rowData(1, 10, 102L));
+        write.write(rowData(1, 20, 201L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+
+        FullCompactedStartingScanner scanner = new 
FullCompactedStartingScanner(3);
+
+        // No compact snapshot found, reading from the latest snapshot
+        assertThat(scanner.scan(snapshotManager, 
snapshotSplitReader).snapshotId()).isEqualTo(4);
 
         write.close();
         commit.close();
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 6c1627998..59cc98ce4 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -51,13 +51,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         assertThat(batchSql("SELECT * FROM T")).isEmpty();
     }
 
-    @Test
-    public void testCompactedScanModeEmpty() {
-        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.mode'='compacted-full') */"))
-                .isEmpty();
-    }
-
     @Test
     public void testTimeTravelRead() throws InterruptedException {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 6c1627998..59cc98ce4 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -51,13 +51,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         assertThat(batchSql("SELECT * FROM T")).isEmpty();
     }
 
-    @Test
-    public void testCompactedScanModeEmpty() {
-        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.mode'='compacted-full') */"))
-                .isEmpty();
-    }
-
     @Test
     public void testTimeTravelRead() throws InterruptedException {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 9ca680e46..4c8e1967d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -46,13 +46,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         assertThat(batchSql("SELECT * FROM T")).isEmpty();
     }
 
-    @Test
-    public void testCompactedScanModeEmpty() {
-        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.mode'='compacted-full') */"))
-                .isEmpty();
-    }
-
     @Test
     public void testTimeTravelRead() throws InterruptedException {
         batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");

Reply via email to