This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 75166c2cb1 [core] Support diff scan mode in incremental query (#5321)
75166c2cb1 is described below
commit 75166c2cb1c7a506f287717ba3505018a3689aa1
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 21 19:06:48 2025 +0800
[core] Support diff scan mode in incremental query (#5321)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 23 +---
.../paimon/table/source/AbstractDataTableScan.java | 123 ++++++++++++---------
.../snapshot/EmptyResultStartingScanner.java | 2 +-
...r.java => IncrementalDeltaStartingScanner.java} | 37 +++++--
...er.java => IncrementalDiffStartingScanner.java} | 68 ++++++++++--
.../IncrementalTimeStampStartingScanner.java | 76 -------------
.../table/source/snapshot/TimeTravelUtil.java | 4 +-
...va => IncrementalDeltaStartingScannerTest.java} | 10 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 40 ++++++-
10 files changed, 208 insertions(+), 177 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 746d83efbb..15d2ce819d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -408,7 +408,7 @@ under the License.
<td><h5>incremental-between-scan-mode</h5></td>
<td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
- <td>Scan kind when Read incremental changes between start snapshot
(exclusive) and end snapshot (inclusive). <br /><br />Possible
values:<ul><li>"auto": Scan changelog files for the table which produces
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan
newly changed files between snapshots.</li><li>"changelog": Scan changelog
files between snapshots.</li></ul></td>
+ <td>Scan kind when Read incremental changes between start snapshot
(exclusive) and end snapshot (inclusive). <br /><br />Possible
values:<ul><li>"auto": Scan changelog files for the table which produces
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan
newly changed files between snapshots.</li><li>"changelog": Scan changelog
files between snapshots.</li><li>"diff": Get diff by comparing data of end
snapshot with data of start snapshot.</li></ul></td>
</tr>
<tr>
<td><h5>incremental-between-timestamp</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0b3e24eb5d..e250293446 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2925,7 +2925,8 @@ public class CoreOptions implements Serializable {
"auto",
"Scan changelog files for the table which produces changelog
files. Otherwise, scan newly changed files."),
DELTA("delta", "Scan newly changed files between snapshots."),
- CHANGELOG("changelog", "Scan changelog files between snapshots.");
+ CHANGELOG("changelog", "Scan changelog files between snapshots."),
+ DIFF("diff", "Get diff by comparing data of end snapshot with data of
start snapshot.");
private final String value;
private final String description;
@@ -2944,26 +2945,6 @@ public class CoreOptions implements Serializable {
public InlineElement getDescription() {
return text(description);
}
-
- public String getValue() {
- return value;
- }
-
- @VisibleForTesting
- public static IncrementalBetweenScanMode fromValue(String value) {
- for (IncrementalBetweenScanMode formatType :
IncrementalBetweenScanMode.values()) {
- if (formatType.value.equals(value)) {
- return formatType;
- }
- }
- throw new IllegalArgumentException(
- String.format(
- "Invalid format type %s, only support [%s]",
- value,
- StringUtils.join(
-
Arrays.stream(IncrementalBetweenScanMode.values()).iterator(),
- ",")));
- }
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 83c2ea5c72..37cd0e378f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -34,12 +34,12 @@ import
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartin
import
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
+import org.apache.paimon.table.source.snapshot.EmptyResultStartingScanner;
import org.apache.paimon.table.source.snapshot.FileCreationTimeStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
-import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
-import org.apache.paimon.table.source.snapshot.IncrementalTagStartingScanner;
-import
org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalDeltaStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalDiffStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
@@ -61,7 +61,7 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DIFF;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -228,56 +228,22 @@ abstract class AbstractDataTableScan implements
DataTableScan {
}
private StartingScanner createIncrementalStartingScanner(SnapshotManager
snapshotManager) {
- CoreOptions.IncrementalBetweenScanMode scanType =
options.incrementalBetweenScanMode();
- ScanMode scanMode;
- switch (scanType) {
- case AUTO:
- scanMode =
- options.changelogProducer() == ChangelogProducer.NONE
- ? ScanMode.DELTA
- : ScanMode.CHANGELOG;
- break;
- case DELTA:
- scanMode = ScanMode.DELTA;
- break;
- case CHANGELOG:
- scanMode = ScanMode.CHANGELOG;
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown incremental scan type " + scanType.name());
- }
-
Options conf = options.toConfiguration();
- TagManager tagManager =
- new TagManager(
- snapshotManager.fileIO(),
- snapshotManager.tablePath(),
- snapshotManager.branch());
+
if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
Pair<String, String> incrementalBetween =
options.incrementalBetween();
+
+ TagManager tagManager =
+ new TagManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch());
Optional<Tag> startTag =
tagManager.get(incrementalBetween.getLeft());
Optional<Tag> endTag =
tagManager.get(incrementalBetween.getRight());
- if (startTag.isPresent() && endTag.isPresent()) {
- Snapshot start = startTag.get().trimToSnapshot();
- Snapshot end = endTag.get().trimToSnapshot();
- LOG.info(
- "{} start and end are parsed to tag with snapshot id
{} to {}.",
- INCREMENTAL_BETWEEN.key(),
- start.id(),
- end.id());
-
- if (end.id() <= start.id()) {
- throw new IllegalArgumentException(
- String.format(
- "Tag end %s with snapshot id %s should be
larger than tag start %s with snapshot id %s",
- incrementalBetween.getRight(),
- end.id(),
- incrementalBetween.getLeft(),
- start.id()));
- }
- return new IncrementalTagStartingScanner(snapshotManager,
start, end);
+ if (startTag.isPresent() && endTag.isPresent()) {
+ return IncrementalDiffStartingScanner.betweenTags(
+ startTag.get(), endTag.get(), snapshotManager,
incrementalBetween);
} else {
long startId, endId;
try {
@@ -290,20 +256,67 @@ abstract class AbstractDataTableScan implements
DataTableScan {
+ "Please set two tags or two
snapshot Ids.",
incrementalBetween.getLeft(),
incrementalBetween.getRight()));
}
- return new IncrementalStartingScanner(snapshotManager,
startId, endId, scanMode);
+
+ CoreOptions.IncrementalBetweenScanMode scanMode =
+ options.incrementalBetweenScanMode();
+ return scanMode == DIFF
+ ? IncrementalDiffStartingScanner.betweenSnapshotIds(
+ startId, endId, snapshotManager)
+ : new IncrementalDeltaStartingScanner(
+ snapshotManager, startId, endId,
toSnapshotScanMode(scanMode));
}
} else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
Pair<Long, Long> incrementalBetween =
options.incrementalBetweenTimestamp();
- return new IncrementalTimeStampStartingScanner(
- snapshotManager,
- incrementalBetween.getLeft(),
- incrementalBetween.getRight(),
- scanMode);
+
+ Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ if (earliestSnapshot == null || latestSnapshot == null) {
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
+ long startTimestamp = incrementalBetween.getLeft();
+ long endTimestamp = incrementalBetween.getRight();
+ checkArgument(
+ endTimestamp > startTimestamp,
+ "Ending timestamp %s should be larger than starting
timestamp %s.",
+ endTimestamp,
+ startTimestamp);
+ if (startTimestamp > latestSnapshot.timeMillis()
+ || endTimestamp < earliestSnapshot.timeMillis()) {
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
+ CoreOptions.IncrementalBetweenScanMode scanMode =
options.incrementalBetweenScanMode();
+
+ return scanMode == DIFF
+ ? IncrementalDiffStartingScanner.betweenTimestamps(
+ startTimestamp, endTimestamp, snapshotManager)
+ : IncrementalDeltaStartingScanner.betweenTimestamps(
+ startTimestamp,
+ endTimestamp,
+ snapshotManager,
+ toSnapshotScanMode(scanMode));
} else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
String endTag = options.incrementalToAutoTag();
- return IncrementalTagStartingScanner.create(snapshotManager,
endTag, options);
+ return
IncrementalDiffStartingScanner.toEndAutoTag(snapshotManager, endTag, options);
} else {
throw new UnsupportedOperationException("Unknown incremental read
mode.");
}
}
+
+ private ScanMode toSnapshotScanMode(CoreOptions.IncrementalBetweenScanMode
scanMode) {
+ switch (scanMode) {
+ case AUTO:
+ return options.changelogProducer() == ChangelogProducer.NONE
+ ? ScanMode.DELTA
+ : ScanMode.CHANGELOG;
+ case DELTA:
+ return ScanMode.DELTA;
+ case CHANGELOG:
+ return ScanMode.CHANGELOG;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported incremental scan mode " +
scanMode.name());
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
index fc38e272d5..eb7c5c0ef2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
@@ -23,7 +23,7 @@ import org.apache.paimon.utils.SnapshotManager;
/** This scanner always return an empty result. */
public class EmptyResultStartingScanner extends AbstractStartingScanner {
- EmptyResultStartingScanner(SnapshotManager snapshotManager) {
+ public EmptyResultStartingScanner(SnapshotManager snapshotManager) {
super(snapshotManager);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
similarity index 84%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index 9bfb54f2cf..6f18dde393 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
@@ -52,15 +51,18 @@ import java.util.stream.LongStream;
import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** {@link StartingScanner} for incremental changes by snapshot. */
-public class IncrementalStartingScanner extends AbstractStartingScanner {
+/**
+ * Get incremental data by reading delta or changelog files from snapshots
between start and end.
+ */
+public class IncrementalDeltaStartingScanner extends AbstractStartingScanner {
- private static final Logger LOG =
LoggerFactory.getLogger(IncrementalStartingScanner.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(IncrementalDeltaStartingScanner.class);
private final long endingSnapshotId;
private final ScanMode scanMode;
- public IncrementalStartingScanner(
+ public IncrementalDeltaStartingScanner(
SnapshotManager snapshotManager, long start, long end, ScanMode
scanMode) {
super(snapshotManager);
this.startingSnapshotId = start;
@@ -89,13 +91,13 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
Snapshot snapshot = snapshotManager.snapshot(id);
switch (scanMode) {
case DELTA:
- if (snapshot.commitKind() !=
CommitKind.APPEND) {
+ if (snapshot.commitKind() !=
Snapshot.CommitKind.APPEND) {
// ignore COMPACT and OVERWRITE
return Collections.emptyList();
}
break;
case CHANGELOG:
- if (snapshot.commitKind() ==
CommitKind.OVERWRITE) {
+ if (snapshot.commitKind() ==
Snapshot.CommitKind.OVERWRITE) {
// ignore OVERWRITE
return Collections.emptyList();
}
@@ -156,7 +158,7 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
*
* @return If the check passes return empty.
*/
- public Optional<Result> checkScanSnapshotIdValidity() {
+ private Optional<Result> checkScanSnapshotIdValidity() {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -185,4 +187,23 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
return Optional.empty();
}
+
+ public static IncrementalDeltaStartingScanner betweenTimestamps(
+ long startTimestamp,
+ long endTimestamp,
+ SnapshotManager snapshotManager,
+ ScanMode scanMode) {
+ Snapshot startingSnapshot =
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
+ Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+ // if earliestSnapShot.timeMillis() > startTimestamp we should include
the earliestSnapShot
+ long startId =
+ (startingSnapshot == null || earliestSnapshot.timeMillis() >
startTimestamp)
+ ? earliestSnapshot.id() - 1
+ : startingSnapshot.id();
+
+ Snapshot endSnapshot =
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
+ long endId = endSnapshot == null ?
snapshotManager.latestSnapshot().id() : endSnapshot.id();
+
+ return new IncrementalDeltaStartingScanner(snapshotManager, startId,
endId, scanMode);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
similarity index 60%
rename from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
index 388b36fb28..942ee23ffa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
@@ -35,24 +35,25 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** {@link StartingScanner} for incremental changes by tag. */
-public class IncrementalTagStartingScanner extends AbstractStartingScanner {
+/** Get incremental data by {@link SnapshotReader#readIncrementalDiff}. */
+public class IncrementalDiffStartingScanner extends AbstractStartingScanner {
- private static final Logger LOG =
LoggerFactory.getLogger(IncrementalTagStartingScanner.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrementalDiffStartingScanner.class);
private final Snapshot start;
private final Snapshot end;
- public IncrementalTagStartingScanner(
+ public IncrementalDiffStartingScanner(
SnapshotManager snapshotManager, Snapshot start, Snapshot end) {
super(snapshotManager);
this.start = start;
this.end = end;
this.startingSnapshotId = start.id();
- TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery(
+ TimeTravelUtil.checkRescaleBucketForIncrementalDiffQuery(
new SchemaManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
@@ -66,7 +67,60 @@ public class IncrementalTagStartingScanner extends
AbstractStartingScanner {
return
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
}
- public static AbstractStartingScanner create(
+ public static IncrementalDiffStartingScanner betweenTags(
+ Tag startTag,
+ Tag endTag,
+ SnapshotManager snapshotManager,
+ Pair<String, String> incrementalBetween) {
+ Snapshot start = startTag.trimToSnapshot();
+ Snapshot end = endTag.trimToSnapshot();
+
+ LOG.info(
+ "{} start and end are parsed to tag with snapshot id {} to
{}.",
+ INCREMENTAL_BETWEEN.key(),
+ start.id(),
+ end.id());
+
+ checkArgument(
+ end.id() > start.id(),
+ "Tag end %s with snapshot id %s should be larger than tag
start %s with snapshot id %s",
+ incrementalBetween.getRight(),
+ end.id(),
+ incrementalBetween.getLeft(),
+ start.id());
+
+ return new IncrementalDiffStartingScanner(snapshotManager, start, end);
+ }
+
+ public static IncrementalDiffStartingScanner betweenSnapshotIds(
+ long startId, long endId, SnapshotManager snapshotManager) {
+ checkArgument(
+ endId > startId,
+ "Ending snapshotId should be larger than starting snapshotId
%s.",
+ endId,
+ startId);
+
+ Snapshot start = snapshotManager.snapshot(startId);
+ Snapshot end = snapshotManager.snapshot(endId);
+ return new IncrementalDiffStartingScanner(snapshotManager, start, end);
+ }
+
+ public static IncrementalDiffStartingScanner betweenTimestamps(
+ long startTimestamp, long endTimestamp, SnapshotManager
snapshotManager) {
+ Snapshot startSnapshot =
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
+ if (startSnapshot == null) {
+ startSnapshot = snapshotManager.earliestSnapshot();
+ }
+
+ Snapshot endSnapshot =
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
+ if (endSnapshot == null) {
+ endSnapshot = snapshotManager.latestSnapshot();
+ }
+
+ return new IncrementalDiffStartingScanner(snapshotManager,
startSnapshot, endSnapshot);
+ }
+
+ public static AbstractStartingScanner toEndAutoTag(
SnapshotManager snapshotManager, String endTagName, CoreOptions
options) {
TagPeriodHandler periodHandler = TagPeriodHandler.create(options);
checkArgument(
@@ -104,6 +158,6 @@ public class IncrementalTagStartingScanner extends
AbstractStartingScanner {
LOG.info("Found start tag {} .",
periodHandler.timeToTag(previousTags.get(0).getRight()));
Snapshot start = previousTags.get(0).getLeft().trimToSnapshot();
- return new IncrementalTagStartingScanner(snapshotManager, start, end);
+ return new IncrementalDiffStartingScanner(snapshotManager, start, end);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
deleted file mode 100644
index 26ece79d08..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-import org.apache.paimon.utils.SnapshotManager;
-
-/** {@link StartingScanner} for incremental changes by timestamp. */
-public class IncrementalTimeStampStartingScanner extends
AbstractStartingScanner {
-
- private final long startTimestamp;
- private final long endTimestamp;
- private final ScanMode scanMode;
-
- public IncrementalTimeStampStartingScanner(
- SnapshotManager snapshotManager,
- long startTimestamp,
- long endTimestamp,
- ScanMode scanMode) {
- super(snapshotManager);
- this.startTimestamp = startTimestamp;
- this.endTimestamp = endTimestamp;
- this.scanMode = scanMode;
- Snapshot startingSnapshot =
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
- if (startingSnapshot != null) {
- this.startingSnapshotId = startingSnapshot.id();
- }
- }
-
- @Override
- public Result scan(SnapshotReader reader) {
- Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
- if (earliestSnapshot == null) {
- return new NoSnapshot();
- }
-
- Snapshot latestSnapshot = snapshotManager.latestSnapshot();
- if (startTimestamp > latestSnapshot.timeMillis()
- || endTimestamp < earliestSnapshot.timeMillis()) {
- return new NoSnapshot();
- }
- // in org.apache.paimon.utils.SnapshotManager.earlierOrEqualTimeMills
- // 1. if earliestSnapshotId or latestSnapshotId is null
startingSnapshotId will be null
- // 2. if earliestSnapShot.timeMillis() > startTimestamp
startingSnapshotId will be
- // earliestSnapShotId
- // if earliestSnapShot.timeMillis() > startTimestamp we should
include the earliestSnapShot
- // data
- Long startSnapshotId =
- (startingSnapshotId == null || earliestSnapshot.timeMillis() >
startTimestamp)
- ? earliestSnapshot.id() - 1
- : startingSnapshotId;
- Snapshot endSnapshot =
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
- Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() :
endSnapshot.id();
- IncrementalStartingScanner incrementalStartingScanner =
- new IncrementalStartingScanner(
- snapshotManager, startSnapshotId, endSnapshotId,
scanMode);
- return incrementalStartingScanner.scan(reader);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 140309a198..cf6feaa3c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -238,7 +238,7 @@ public class TimeTravelUtil {
}
}
- public static void checkRescaleBucketForIncrementalTagQuery(
+ public static void checkRescaleBucketForIncrementalDiffQuery(
SchemaManager schemaManager, Snapshot start, Snapshot end) {
if (start.schemaId() != end.schemaId()) {
int startBucketNumber = bucketNumber(schemaManager,
start.schemaId());
@@ -248,7 +248,7 @@ public class TimeTravelUtil {
start.id(),
end.id(),
String.format(
- "The bucket number of two tags are different
(%s, %s), which is not supported in incremental tag query.",
+ "The bucket number of two snapshots are
different (%s, %s), which is not supported in incremental diff query.",
startBucketNumber, endBucketNumber));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
similarity index 94%
rename from
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScannerTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index 2da2f942d2..100b80c93f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -42,8 +42,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link IncrementalStartingScanner}. */
-public class IncrementalStartingScannerTest extends ScannerTestBase {
+/** Tests for {@link IncrementalDeltaStartingScanner}. */
+public class IncrementalDeltaStartingScannerTest extends ScannerTestBase {
@Test
public void testScan() throws Exception {
@@ -110,14 +110,14 @@ public class IncrementalStartingScannerTest extends
ScannerTestBase {
assertThatNoException()
.isThrownBy(
() ->
- new IncrementalStartingScanner(
+ new IncrementalDeltaStartingScanner(
snapshotManager, 0, 4,
ScanMode.DELTA)
.scan(snapshotReader));
// Starting snapshotId must less than ending snapshotId.
assertThatThrownBy(
() ->
- new IncrementalStartingScanner(
+ new IncrementalDeltaStartingScanner(
snapshotManager, 4, 3,
ScanMode.DELTA)
.scan(snapshotReader))
.satisfies(
@@ -127,7 +127,7 @@ public class IncrementalStartingScannerTest extends
ScannerTestBase {
assertThatThrownBy(
() ->
- new IncrementalStartingScanner(
+ new IncrementalDeltaStartingScanner(
snapshotManager, 1, 5,
ScanMode.DELTA)
.scan(snapshotReader))
.satisfies(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index b16ef18deb..8d55c0e647 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
@@ -694,7 +695,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
.satisfies(
anyCauseMatches(
TimeTravelUtil.InconsistentTagBucketException.class,
- "The bucket number of two tags are
different (1, 2), which is not supported in incremental tag query."));
+ "The bucket number of two snapshots are
different (1, 2), which is not supported in incremental diff query."));
}
}
@@ -735,4 +736,41 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM T /*+
OPTIONS('incremental-between-timestamp'='0,1') */"))
.isEmpty();
}
+
+ @Test
+ public void testIncrementScanMode() throws Exception {
+ sql(
+ "CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED,
v STRING) WITH ('changelog-producer' = 'lookup')");
+
+ // snapshot 1,2
+ sql("INSERT INTO test_scan_mode VALUES (1, 'A')");
+ // snapshot 3,4
+ sql("INSERT INTO test_scan_mode VALUES (2, 'B')");
+
+ // snapshot 5,6
+ String dataId =
+ TestValuesTableFactory.registerData(
+ Collections.singletonList(Row.ofKind(RowKind.DELETE,
2, "B")));
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE source (id INT, v STRING) "
+ + "WITH ('connector' = 'values', 'bounded' = 'true',
'data-id' = '"
+ + dataId
+ + "')");
+ sEnv.executeSql("INSERT INTO test_scan_mode SELECT * FROM
source").await();
+
+ // snapshot 7,8
+ sql("INSERT INTO test_scan_mode VALUES (3, 'C')");
+
+ List<Row> result =
+ sql(
+ "SELECT * FROM `test_scan_mode$audit_log` "
+ + "/*+
OPTIONS('incremental-between'='1,8','incremental-between-scan-mode'='diff')
*/");
+ assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 3, "C"));
+
+ result =
+ sql(
+ "SELECT * FROM `test_scan_mode$audit_log` "
+ + "/*+
OPTIONS('incremental-between'='1,8','incremental-between-scan-mode'='delta')
*/");
+ assertThat(result).containsExactlyInAnyOrder(Row.of("-D", 2, "B"),
Row.of("+I", 3, "C"));
+ }
}