This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 537c625fef [core] Support read increment changelog and delta between 
two tag (#6324)
537c625fef is described below

commit 537c625fef59e3be9a5815d5b4fded22898d35e4
Author: wangwj <[email protected]>
AuthorDate: Mon Oct 13 14:34:47 2025 +0800

    [core] Support read increment changelog and delta between two tag (#6324)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 +++
 .../paimon/table/source/AbstractDataTableScan.java | 16 +++-
 .../IncrementalDeltaStartingScannerTest.java       | 97 +++++++++++++---------
 4 files changed, 88 insertions(+), 42 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index fcd9158207..5d95f775aa 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -566,6 +566,12 @@ under the License.
             <td><p>Enum</p></td>
             <td>Scan kind when Read incremental changes between start snapshot 
(exclusive) and end snapshot (inclusive). <br /><br />Possible 
values:<ul><li>"auto": Scan changelog files for the table which produces 
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan 
newly changed files between snapshots.</li><li>"changelog": Scan changelog 
files between snapshots.</li><li>"diff": Get diff by comparing data of end 
snapshot with data of start snapshot.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>incremental-between-tag-to-snapshot</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to read incremental changes between the snapshot 
corresponding to the tag.</td>
+        </tr>
         <tr>
             <td><h5>incremental-between-timestamp</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f4581098ed..00118784af 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1317,6 +1317,13 @@ public class CoreOptions implements Serializable {
                                     + "If the tag doesn't exist or the earlier 
tag doesn't exist, return empty. "
                                     + "This option requires 
'tag.creation-period' and 'tag.period-formatter' configured.");
 
+    public static final ConfigOption<Boolean> 
INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT =
+            key("incremental-between-tag-to-snapshot")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to read incremental changes between the 
snapshot corresponding to the tag.");
+
     public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE 
=
             key("end-input.check-partition-expire")
                     .booleanType()
@@ -2673,6 +2680,10 @@ public class CoreOptions implements Serializable {
         return options.get(INCREMENTAL_TO_AUTO_TAG);
     }
 
+    public boolean incrementalBetweenTagToSnapshot() {
+        return options.get(INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT);
+    }
+
     public Integer scanManifestParallelism() {
         return options.get(SCAN_MANIFEST_PARALLELISM);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 67acb44066..19b9220278 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -69,6 +69,8 @@ import java.util.Optional;
 import java.util.TimeZone;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static 
org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.CHANGELOG;
+import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DELTA;
 import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DIFF;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -338,8 +340,18 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
             Optional<Tag> endTag = 
tagManager.get(incrementalBetween.getRight());
 
             if (startTag.isPresent() && endTag.isPresent()) {
-                return IncrementalDiffStartingScanner.betweenTags(
-                        startTag.get(), endTag.get(), snapshotManager, 
incrementalBetween);
+                if (options.incrementalBetweenTagToSnapshot()) {
+                    CoreOptions.IncrementalBetweenScanMode scanMode =
+                            options.incrementalBetweenScanMode();
+                    return IncrementalDeltaStartingScanner.betweenSnapshotIds(
+                            startTag.get().id(),
+                            endTag.get().id(),
+                            snapshotManager,
+                            toSnapshotScanMode(scanMode));
+                } else {
+                    return IncrementalDiffStartingScanner.betweenTags(
+                            startTag.get(), endTag.get(), snapshotManager, 
incrementalBetween);
+                }
             } else {
                 long startId, endId;
                 try {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index 5da9cbbf67..1b3f9f31e3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE;
+import static 
org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNoException;
@@ -46,28 +47,8 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class IncrementalDeltaStartingScannerTest extends ScannerTestBase {
 
     @Test
-    public void testScan() throws Exception {
-        SnapshotManager snapshotManager = table.snapshotManager();
-        StreamTableWrite write =
-                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
-        StreamTableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(2, 20, 200L));
-        write.write(rowData(3, 40, 400L));
-        write.compact(binaryRow(1), 0, false);
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(2, 20, 200L));
-        write.write(rowData(3, 40, 500L));
-        write.compact(binaryRow(1), 0, false);
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        write.close();
-        commit.close();
-
-        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+    public void testScanDeltaBySnapshotId() throws Exception {
+        writeDataToTable();
 
         Map<String, String> dynamicOptions = new HashMap<>();
         dynamicOptions.put(INCREMENTAL_BETWEEN.key(), "1,4");
@@ -87,8 +68,62 @@ public class IncrementalDeltaStartingScannerTest extends 
ScannerTestBase {
                 .hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", 
"+I 3|40|500"));
     }
 
+    @Test
+    public void testScanDeltaByTag() throws Exception {
+        writeDataToTable();
+
+        table.createTag("tag-from-snapshot-2", 2L);
+        table.createTag("tag-from-snapshot-4", 4L);
+
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(INCREMENTAL_BETWEEN.key(), 
"tag-from-snapshot-2,tag-from-snapshot-4");
+        dynamicOptions.put(INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT.key(), "true");
+        dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "delta");
+        List<Split> splits = 
table.copy(dynamicOptions).newScan().plan().splits();
+        assertThat(getResult(table.newRead(), splits))
+                .hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", 
"+I 3|40|500"));
+    }
+
+    @Test
+    public void testScanChangelogByTag() throws Exception {
+        writeDataToTable();
+
+        table.createTag("tag-from-snapshot-2", 2L);
+        table.createTag("tag-from-snapshot-4", 4L);
+
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(INCREMENTAL_BETWEEN.key(), 
"tag-from-snapshot-2,tag-from-snapshot-4");
+        dynamicOptions.put(INCREMENTAL_BETWEEN_TAG_TO_SNAPSHOT.key(), "true");
+        dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "changelog");
+        List<Split> splits = 
table.copy(dynamicOptions).newScan().plan().splits();
+        assertThat(getResult(table.newRead(), splits))
+                .hasSameElementsAs(Arrays.asList("-U 3|40|400", "+U 
3|40|500"));
+    }
+
     @Test
     public void testIllegalScanSnapshotId() throws Exception {
+        writeDataToTable();
+
+        // Allowed starting snapshotId to be equal to the earliest snapshotId 
-1.
+        assertThatNoException()
+                .isThrownBy(
+                        () ->
+                                
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+                                                0, 4, table.snapshotManager(), 
ScanMode.DELTA)
+                                        .scan(snapshotReader));
+
+        assertThatThrownBy(
+                        () ->
+                                
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+                                                1, 5, table.snapshotManager(), 
ScanMode.DELTA)
+                                        .scan(snapshotReader))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "The specified scan snapshotId range [1, 5] is 
out of available snapshotId range [1, 4]."));
+    }
+
+    private void writeDataToTable() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write =
                 table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
@@ -110,24 +145,6 @@ public class IncrementalDeltaStartingScannerTest extends 
ScannerTestBase {
         commit.close();
 
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
-
-        // Allowed starting snapshotId to be equal to the earliest snapshotId 
-1.
-        assertThatNoException()
-                .isThrownBy(
-                        () ->
-                                
IncrementalDeltaStartingScanner.betweenSnapshotIds(
-                                                0, 4, snapshotManager, 
ScanMode.DELTA)
-                                        .scan(snapshotReader));
-
-        assertThatThrownBy(
-                        () ->
-                                
IncrementalDeltaStartingScanner.betweenSnapshotIds(
-                                                1, 5, snapshotManager, 
ScanMode.DELTA)
-                                        .scan(snapshotReader))
-                .satisfies(
-                        anyCauseMatches(
-                                IllegalArgumentException.class,
-                                "The specified scan snapshotId range [1, 5] is 
out of available snapshotId range [1, 4]."));
     }
 
     @Override

Reply via email to