This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new da4f31ee7 [core] Fix consuming from-snapshot-full on expired snapshot
(#1300)
da4f31ee7 is described below
commit da4f31ee785294b752ca477e06f67d2c0b6b6852
Author: GuojunLi <[email protected]>
AuthorDate: Tue Jun 27 10:13:35 2023 +0800
[core] Fix consuming from-snapshot-full on expired snapshot (#1300)
---
.../table/source/AbstractInnerTableScan.java | 5 +-
.../ContinuousFromSnapshotFullStartingScanner.java | 46 ++++++++++++++++++
.../paimon/table/source/StartupModeTest.java | 54 ++++++++++++++++++++++
3 files changed, 104 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 2ff81d55d..f2ed2ea85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
+import
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
@@ -126,7 +127,9 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
return new
StaticFromTagStartingScanner(options().scanTagName());
}
case FROM_SNAPSHOT_FULL:
- return new
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+ return isStreaming
+ ? new
ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
+ : new
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
new file mode 100644
index 000000000..d524b7883
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.utils.SnapshotManager;
+
+/**
+ * {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
+ * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
+ */
+public class ContinuousFromSnapshotFullStartingScanner implements
StartingScanner {
+ private final long snapshotId;
+
+ public ContinuousFromSnapshotFullStartingScanner(long snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public Result scan(SnapshotManager snapshotManager, SnapshotReader
snapshotReader) {
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ if (earliestSnapshotId == null) {
+ return new NoSnapshot();
+ }
+ long ceiledSnapshotId = Math.max(snapshotId, earliestSnapshotId);
+ return StartingScanner.fromPlan(
+
snapshotReader.withKind(ScanKind.ALL).withSnapshot(ceiledSnapshotId).read());
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index 4c48a12e6..724d255bb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -193,6 +193,33 @@ public class StartupModeTest extends ScannerTestBase {
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
}
+ @Test
+ public void testTimeTravelFromExpiredSnapshot() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ // retaine 2 snapshots
+ properties.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2");
+ properties.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "2");
+ // specify consume from a expired snapshot
+ properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
+ initializeTable(StartupMode.FROM_SNAPSHOT, properties);
+ initializeTestData(); // initialize 3 commits, expired snapshot 1
+
+ // streaming Mode
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
+
+ assertThat(firstPlan.splits()).isEmpty();
+ // ceiled up to the earliest snapshot id = 2
+ assertThat(secondPlan.splits())
+
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.DELTA).read().splits());
+
+ // batch mode
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits()).isEmpty();
+ }
+
@Test
public void testStartFromSnapshotFull() throws Exception {
Map<String, String> properties = new HashMap<>();
@@ -216,6 +243,33 @@ public class StartupModeTest extends ScannerTestBase {
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
}
+ @Test
+ public void testTimeTravelFromExpiredSnapshotFull() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ // retaine 2 snapshots
+ properties.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2");
+ properties.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "2");
+ // specify consume from a expired snapshot
+ properties.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
+ initializeTable(StartupMode.FROM_SNAPSHOT_FULL, properties);
+ initializeTestData(); // initialize 3 commits, expired snapshot 1
+
+ StreamTableScan dataTableScan = table.newStreamScan();
+ TableScan.Plan firstPlan = dataTableScan.plan();
+ TableScan.Plan secondPlan = dataTableScan.plan();
+
+ // ceiled up to the earliest snapshot id = 2
+ assertThat(firstPlan.splits())
+
.isEqualTo(snapshotReader.withSnapshot(2).withKind(ScanKind.ALL).read().splits());
+ assertThat(secondPlan.splits())
+
.isEqualTo(snapshotReader.withSnapshot(3).withKind(ScanKind.DELTA).read().splits());
+
+ // batch mode
+ TableScan batchScan = table.newScan();
+ TableScan.Plan plan = batchScan.plan();
+ assertThat(plan.splits()).isEmpty();
+ }
+
private void initializeTable(CoreOptions.StartupMode startupMode) throws
Exception {
initializeTable(startupMode, Collections.emptyMap());
}