This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f353ca0e4 [core] Compacted StartingScanner is incorrect (#857)
f353ca0e4 is described below
commit f353ca0e45d206342365f2a0e47619c8b1d4b095
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 11 10:41:37 2023 +0800
[core] Compacted StartingScanner is incorrect (#857)
---
.../source/snapshot/CompactedStartingScanner.java | 19 ++++++--
.../snapshot/FullCompactedStartingScanner.java | 22 ++-------
.../org/apache/paimon/utils/SnapshotManager.java | 4 +-
.../snapshot/CompactedStartingScannerTest.java | 4 +-
....java => FullCompactedStartingScannerTest.java} | 54 +++++++++++-----------
.../apache/paimon/flink/BatchFileStoreITCase.java | 7 ---
.../apache/paimon/flink/BatchFileStoreITCase.java | 7 ---
.../apache/paimon/flink/BatchFileStoreITCase.java | 7 ---
8 files changed, 51 insertions(+), 73 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index 63f5f7578..adb9d2fee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -35,11 +35,19 @@ public class CompactedStartingScanner implements
StartingScanner {
@Override
@Nullable
public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
- Long startingSnapshotId = snapshotManager.latestCompactedSnapshotId();
+ Long startingSnapshotId = pick(snapshotManager);
if (startingSnapshotId == null) {
- LOG.debug("There is currently no compact snapshot. Waiting for
snapshot generation.");
- return null;
+ startingSnapshotId = snapshotManager.latestSnapshotId();
+ if (startingSnapshotId == null) {
+ LOG.debug("There is currently no snapshot. Wait for the
snapshot generation.");
+ return null;
+ } else {
+ LOG.debug(
+ "No compact snapshot found, reading from the latest
snapshot {}.",
+ startingSnapshotId);
+ }
}
+
return new Result(
startingSnapshotId,
snapshotSplitReader
@@ -47,4 +55,9 @@ public class CompactedStartingScanner implements
StartingScanner {
.withSnapshot(startingSnapshotId)
.splits());
}
+
+ @Nullable
+ protected Long pick(SnapshotManager snapshotManager) {
+ return snapshotManager.latestCompactedSnapshotId();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
index bb7b38b4b..46b2f2453 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -21,21 +21,15 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
-import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.utils.SnapshotManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
/**
* {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup
mode with
* 'full-compaction.delta-commits'.
*/
-public class FullCompactedStartingScanner implements StartingScanner {
-
- private static final Logger LOG =
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
+public class FullCompactedStartingScanner extends CompactedStartingScanner {
private final int deltaCommits;
@@ -45,18 +39,8 @@ public class FullCompactedStartingScanner implements
StartingScanner {
@Override
@Nullable
- public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader
snapshotSplitReader) {
- Long startingSnapshotId = snapshotManager.pickSnapshot(this::picked);
- if (startingSnapshotId == null) {
- LOG.debug("There is currently no compact snapshot. Waiting for
snapshot generation.");
- return null;
- }
- return new Result(
- startingSnapshotId,
- snapshotSplitReader
- .withKind(ScanKind.ALL)
- .withSnapshot(startingSnapshotId)
- .splits());
+ protected Long pick(SnapshotManager snapshotManager) {
+ return snapshotManager.pickFromLatest(this::picked);
}
private boolean picked(Snapshot snapshot) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 1ef5d8a26..077fe3cd0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -100,10 +100,10 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Long latestCompactedSnapshotId() {
- return pickSnapshot(s -> s.commitKind() == CommitKind.COMPACT);
+ return pickFromLatest(s -> s.commitKind() == CommitKind.COMPACT);
}
- public @Nullable Long pickSnapshot(Predicate<Snapshot> predicate) {
+ public @Nullable Long pickFromLatest(Predicate<Snapshot> predicate) {
Long latestId = latestSnapshotId();
Long earliestId = earliestSnapshotId();
if (latestId == null || earliestId == null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 42ec5dc76..01e40cfea 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -86,7 +86,9 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
CompactedStartingScanner scanner = new CompactedStartingScanner();
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+
+ // No compact snapshot found, reading from the latest snapshot
+ assertThat(scanner.scan(snapshotManager,
snapshotSplitReader).snapshotId()).isEqualTo(1);
write.close();
commit.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
similarity index 78%
copy from
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
copy to
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
index 42ec5dc76..ba180684e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScannerTest.java
@@ -25,12 +25,10 @@ import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link CompactedStartingScanner}. */
-public class CompactedStartingScannerTest extends ScannerTestBase {
+/** Tests for {@link FullCompactedStartingScanner}. */
+public class FullCompactedStartingScannerTest extends ScannerTestBase {
@Test
public void testScan() throws Exception {
@@ -38,28 +36,18 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(1, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
+ for (int i = 0; i < 5; i++) {
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 20, 201L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(i, write.prepareCommit(true, i));
+ }
- write.write(rowData(1, 10, 101L));
- write.write(rowData(1, 30, 300L));
- write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
- write.compact(binaryRow(1), 0, true);
- commit.commit(1, write.prepareCommit(true, 1));
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10);
- write.write(rowData(1, 10, 102L));
- write.write(rowData(1, 20, 201L));
- commit.commit(2, write.prepareCommit(true, 2));
-
- assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
-
- CompactedStartingScanner scanner = new CompactedStartingScanner();
+ FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
StartingScanner.Result result = scanner.scan(snapshotManager,
snapshotSplitReader);
- assertThat(result.snapshotId()).isEqualTo(3);
- assertThat(getResult(table.newRead(), toSplits(result.splits())))
- .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200",
"+I 1|30|300"));
+ assertThat(result.snapshotId()).isEqualTo(8);
write.close();
commit.close();
@@ -68,7 +56,7 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
@Test
public void testNoSnapshot() {
SnapshotManager snapshotManager = table.snapshotManager();
- CompactedStartingScanner scanner = new CompactedStartingScanner();
+ FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
}
@@ -83,10 +71,22 @@ public class CompactedStartingScannerTest extends
ScannerTestBase {
write.write(rowData(1, 40, 400L));
commit.commit(0, write.prepareCommit(true, 0));
- assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
+ write.write(rowData(1, 10, 101L));
+ write.write(rowData(1, 30, 300L));
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(1, write.prepareCommit(true, 1));
- CompactedStartingScanner scanner = new CompactedStartingScanner();
- assertThat(scanner.scan(snapshotManager,
snapshotSplitReader)).isNull();
+ write.write(rowData(1, 10, 102L));
+ write.write(rowData(1, 20, 201L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+
+ FullCompactedStartingScanner scanner = new
FullCompactedStartingScanner(3);
+
+ // No compact snapshot found, reading from the latest snapshot
+ assertThat(scanner.scan(snapshotManager,
snapshotSplitReader).snapshotId()).isEqualTo(4);
write.close();
commit.close();
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 6c1627998..59cc98ce4 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -51,13 +51,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T")).isEmpty();
}
- @Test
- public void testCompactedScanModeEmpty() {
- batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.mode'='compacted-full') */"))
- .isEmpty();
- }
-
@Test
public void testTimeTravelRead() throws InterruptedException {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 6c1627998..59cc98ce4 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -51,13 +51,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T")).isEmpty();
}
- @Test
- public void testCompactedScanModeEmpty() {
- batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.mode'='compacted-full') */"))
- .isEmpty();
- }
-
@Test
public void testTimeTravelRead() throws InterruptedException {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 9ca680e46..4c8e1967d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -46,13 +46,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T")).isEmpty();
}
- @Test
- public void testCompactedScanModeEmpty() {
- batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.mode'='compacted-full') */"))
- .isEmpty();
- }
-
@Test
public void testTimeTravelRead() throws InterruptedException {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");