This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 537c625fef [core] Support read increment changelog and delta between
two tag (#6324)
537c625fef is described below
commit 537c625fef59e3be9a5815d5b4fded22898d35e4
Author: wangwj <[email protected]>
AuthorDate: Mon Oct 13 14:34:47 2025 +0800
[core] Support read increment changelog and delta between two tag (#6324)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++
.../paimon/table/source/AbstractDataTableScan.java | 16 +++-
.../IncrementalDeltaStartingScannerTest.java | 97 +++++++++++++---------
4 files changed, 88 insertions(+), 42 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index fcd9158207..5d95f775aa 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -566,6 +566,12 @@ under the License.
<td><p>Enum</p></td>
<td>Scan kind when Read incremental changes between start snapshot
(exclusive) and end snapshot (inclusive). <br /><br />Possible
values:<ul><li>"auto": Scan changelog files for the table which produces
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan
newly changed files between snapshots.</li><li>"changelog": Scan changelog
files between snapshots.</li><li>"diff": Get diff by comparing data of end
snapshot with data of start snapshot.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>incremental-between-tag-to-snapshot</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to read incremental changes between the snapshot
corresponding to the tag.</td>
+ </tr>
<tr>
<td><h5>incremental-between-timestamp</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f4581098ed..00118784af 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1317,6 +1317,13 @@ public class CoreOptions implements Serializable {
+ "If the tag doesn't exist or the earlier
tag doesn't exist, return empty. "
+ "This option requires
'tag.creation-period' and 'tag.period-formatter' configured.");
+ public static final ConfigOption<Boolean>
INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT =
+ key("incremental-between-tag-to-snapshot")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to read incremental changes between the
snapshot corresponding to the tag.");
+
public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE
=
key("end-input.check-partition-expire")
.booleanType()
@@ -2673,6 +2680,10 @@ public class CoreOptions implements Serializable {
return options.get(INCREMENTAL_TO_AUTO_TAG);
}
+ public boolean incrementalBetweenTagToSnapshot() {
+ return options.get(INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT);
+ }
+
public Integer scanManifestParallelism() {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 67acb44066..19b9220278 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -69,6 +69,8 @@ import java.util.Optional;
import java.util.TimeZone;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static
org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.CHANGELOG;
+import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DELTA;
import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DIFF;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -338,8 +340,18 @@ abstract class AbstractDataTableScan implements
DataTableScan {
Optional<Tag> endTag =
tagManager.get(incrementalBetween.getRight());
if (startTag.isPresent() && endTag.isPresent()) {
- return IncrementalDiffStartingScanner.betweenTags(
- startTag.get(), endTag.get(), snapshotManager,
incrementalBetween);
+ if (options.incrementalBetweenTagToSnapshot()) {
+ CoreOptions.IncrementalBetweenScanMode scanMode =
+ options.incrementalBetweenScanMode();
+ return IncrementalDeltaStartingScanner.betweenSnapshotIds(
+ startTag.get().id(),
+ endTag.get().id(),
+ snapshotManager,
+ toSnapshotScanMode(scanMode));
+ } else {
+ return IncrementalDiffStartingScanner.betweenTags(
+ startTag.get(), endTag.get(), snapshotManager,
incrementalBetween);
+ }
} else {
long startId, endId;
try {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index 5da9cbbf67..1b3f9f31e3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE;
+import static
org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
@@ -46,28 +47,8 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
public class IncrementalDeltaStartingScannerTest extends ScannerTestBase {
@Test
- public void testScan() throws Exception {
- SnapshotManager snapshotManager = table.snapshotManager();
- StreamTableWrite write =
- table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
- StreamTableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(2, 20, 200L));
- write.write(rowData(3, 40, 400L));
- write.compact(binaryRow(1), 0, false);
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(2, 20, 200L));
- write.write(rowData(3, 40, 500L));
- write.compact(binaryRow(1), 0, false);
- commit.commit(1, write.prepareCommit(true, 1));
-
- write.close();
- commit.close();
-
- assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+ public void testScanDeltaBySnapshotId() throws Exception {
+ writeDataToTable();
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(INCREMENTAL_BETWEEN.key(), "1,4");
@@ -87,8 +68,62 @@ public class IncrementalDeltaStartingScannerTest extends
ScannerTestBase {
.hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100",
"+I 3|40|500"));
}
+ @Test
+ public void testScanDeltaByTag() throws Exception {
+ writeDataToTable();
+
+ table.createTag("tag-from-snapshot-2", 2L);
+ table.createTag("tag-from-snapshot-4", 4L);
+
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(INCREMENTAL_BETWEEN.key(),
"tag-from-snapshot-2,tag-from-snapshot-4");
+ dynamicOptions.put(INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT.key(), "true");
+ dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "delta");
+ List<Split> splits =
table.copy(dynamicOptions).newScan().plan().splits();
+ assertThat(getResult(table.newRead(), splits))
+ .hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100",
"+I 3|40|500"));
+ }
+
+ @Test
+ public void testScanChangelogByTag() throws Exception {
+ writeDataToTable();
+
+ table.createTag("tag-from-snapshot-2", 2L);
+ table.createTag("tag-from-snapshot-4", 4L);
+
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(INCREMENTAL_BETWEEN.key(),
"tag-from-snapshot-2,tag-from-snapshot-4");
+ dynamicOptions.put(INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT.key(), "true");
+ dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "changelog");
+ List<Split> splits =
table.copy(dynamicOptions).newScan().plan().splits();
+ assertThat(getResult(table.newRead(), splits))
+ .hasSameElementsAs(Arrays.asList("-U 3|40|400", "+U
3|40|500"));
+ }
+
@Test
public void testIllegalScanSnapshotId() throws Exception {
+ writeDataToTable();
+
+ // Allowed starting snapshotId to be equal to the earliest snapshotId
-1.
+ assertThatNoException()
+ .isThrownBy(
+ () ->
+
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+ 0, 4, table.snapshotManager(),
ScanMode.DELTA)
+ .scan(snapshotReader));
+
+ assertThatThrownBy(
+ () ->
+
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+ 1, 5, table.snapshotManager(),
ScanMode.DELTA)
+ .scan(snapshotReader))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "The specified scan snapshotId range [1, 5] is
out of available snapshotId range [1, 4]."));
+ }
+
+ private void writeDataToTable() throws Exception {
SnapshotManager snapshotManager = table.snapshotManager();
StreamTableWrite write =
table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
@@ -110,24 +145,6 @@ public class IncrementalDeltaStartingScannerTest extends
ScannerTestBase {
commit.close();
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
-
- // Allowed starting snapshotId to be equal to the earliest snapshotId
-1.
- assertThatNoException()
- .isThrownBy(
- () ->
-
IncrementalDeltaStartingScanner.betweenSnapshotIds(
- 0, 4, snapshotManager,
ScanMode.DELTA)
- .scan(snapshotReader));
-
- assertThatThrownBy(
- () ->
-
IncrementalDeltaStartingScanner.betweenSnapshotIds(
- 1, 5, snapshotManager,
ScanMode.DELTA)
- .scan(snapshotReader))
- .satisfies(
- anyCauseMatches(
- IllegalArgumentException.class,
- "The specified scan snapshotId range [1, 5] is
out of available snapshotId range [1, 4]."));
}
@Override