This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 454040be7 [core] Support incremental read on tags (#1535)
454040be7 is described below

commit 454040be70034896184123fe508abec2c6274644
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 11 21:13:29 2023 +0800

    [core] Support incremental read on tags (#1535)
---
 docs/content/how-to/querying-tables.md             |   4 +-
 .../paimon/operation/AppendOnlyFileStoreRead.java  |  10 +-
 .../org/apache/paimon/operation/DiffReader.java    | 137 +++++++++++++++++++++
 .../paimon/operation/KeyValueFileStoreRead.java    |  78 +++++++-----
 .../table/source/AbstractInnerTableScan.java       |  12 +-
 .../snapshot/IncrementalTagStartingScanner.java    |  51 ++++++++
 .../table/source/snapshot/SnapshotReader.java      |   2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  30 ++++-
 .../apache/paimon/table/system/AuditLogTable.java  |   5 +
 .../apache/paimon/table/IncrementalTableTest.java  |  91 ++++++++++++++
 10 files changed, 377 insertions(+), 43 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 5a454b70a..1b1fdeb13 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -117,7 +117,9 @@ SELECT * FROM t;
 
 Read incremental changes between start snapshot (exclusive) and end snapshot.
 
-For example, '5,10' means changes between snapshot 5 and snapshot 10.
+For example:
+- '5,10' means changes between snapshot 5 and snapshot 10.
+- 'TAG1,TAG3' means changes between TAG1 and TAG3.
 
 {{< tabs "incremental-example" >}}
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index 2a90a0bac..1128d28bc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -39,6 +39,9 @@ import org.apache.paimon.utils.BulkFormatMapping;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Projection;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -52,10 +55,11 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 /** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyFileStoreRead.class);
+
     private final FileIO fileIO;
     private final SchemaManager schemaManager;
     private final long schemaId;
-    private final RowType rowType;
     private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
     private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
@@ -74,7 +78,6 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
-        this.rowType = rowType;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
         this.bulkFormatMappings = new HashMap<>();
@@ -98,6 +101,9 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(split.partition(), 
split.bucket());
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new 
ArrayList<>();
+        if (split.beforeFiles().size() > 0) {
+            LOG.info("Ignore split before files: " + split.beforeFiles());
+        }
         for (DataFileMeta file : split.dataFiles()) {
             String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
             BulkFormatMapping bulkFormatMapping =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
new file mode 100644
index 000000000..324bff935
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.MergeSorter;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/** A {@link RecordReader} util to read diff between before reader and after 
reader. */
+public class DiffReader {
+
+    private static final int BEFORE_LEVEL = Integer.MIN_VALUE;
+    private static final int AFTER_LEVEL = Integer.MAX_VALUE;
+
+    public static RecordReader<KeyValue> readDiff(
+            RecordReader<KeyValue> beforeReader,
+            RecordReader<KeyValue> afterReader,
+            Comparator<InternalRow> keyComparator,
+            MergeSorter sorter,
+            boolean keepDelete)
+            throws IOException {
+        return sorter.mergeSort(
+                Arrays.asList(
+                        () -> wrapLevelToReader(beforeReader, BEFORE_LEVEL),
+                        () -> wrapLevelToReader(afterReader, AFTER_LEVEL)),
+                keyComparator,
+                new DiffMerger(keepDelete));
+    }
+
+    private static RecordReader<KeyValue> wrapLevelToReader(
+            RecordReader<KeyValue> reader, int level) {
+        return new RecordReader<KeyValue>() {
+            @Nullable
+            @Override
+            public RecordIterator<KeyValue> readBatch() throws IOException {
+                RecordIterator<KeyValue> batch = reader.readBatch();
+                if (batch == null) {
+                    return null;
+                }
+
+                return new RecordIterator<KeyValue>() {
+                    @Nullable
+                    @Override
+                    public KeyValue next() throws IOException {
+                        KeyValue kv = batch.next();
+                        if (kv != null) {
+                            kv.setLevel(level);
+                        }
+                        return kv;
+                    }
+
+                    @Override
+                    public void releaseBatch() {
+                        batch.releaseBatch();
+                    }
+                };
+            }
+
+            @Override
+            public void close() throws IOException {
+                reader.close();
+            }
+        };
+    }
+
+    private static class DiffMerger implements MergeFunctionWrapper<KeyValue> {
+
+        private final boolean keepDelete;
+
+        private final List<KeyValue> kvs = new ArrayList<>();
+
+        public DiffMerger(boolean keepDelete) {
+            this.keepDelete = keepDelete;
+        }
+
+        @Override
+        public void reset() {
+            this.kvs.clear();
+        }
+
+        @Override
+        public void add(KeyValue kv) {
+            this.kvs.add(kv);
+        }
+
+        @Nullable
+        @Override
+        public KeyValue getResult() {
+            if (kvs.size() == 1) {
+                KeyValue kv = kvs.get(0);
+                if (kv.level() == BEFORE_LEVEL) {
+                    if (keepDelete) {
+                        return kv.replaceValueKind(RowKind.DELETE);
+                    }
+                } else {
+                    return kv;
+                }
+            } else if (kvs.size() == 2) {
+                KeyValue latest = kvs.get(1);
+                if (latest.level() == AFTER_LEVEL) {
+                    return latest;
+                }
+            } else {
+                throw new IllegalArgumentException("Illegal kv number: " + 
kvs.size());
+            }
+
+            return null;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index b3aa93d53..3a666e709 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormatDiscover;
@@ -199,43 +200,54 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                     ? dataSupplier.get()
                     : ConcatRecordReader.create(Arrays.asList(beforeSupplier, 
dataSupplier));
         } else {
-            // Sections are read by SortMergeReader, which sorts and merges 
records by keys.
-            // So we cannot project keys or else the sorting will be incorrect.
-            KeyValueFileReaderFactory overlappedSectionFactory =
-                    readerFactoryBuilder.build(
-                            split.partition(), split.bucket(), false, 
filtersForOverlappedSection);
-            KeyValueFileReaderFactory nonOverlappedSectionFactory =
-                    readerFactoryBuilder.build(
-                            split.partition(),
-                            split.bucket(),
-                            false,
-                            filtersForNonOverlappedSection);
+            return split.beforeFiles().isEmpty()
+                    ? batchMergeRead(
+                            split.partition(), split.bucket(), 
split.dataFiles(), forceKeepDelete)
+                    : DiffReader.readDiff(
+                            batchMergeRead(
+                                    split.partition(), split.bucket(), 
split.beforeFiles(), false),
+                            batchMergeRead(
+                                    split.partition(), split.bucket(), 
split.dataFiles(), false),
+                            keyComparator,
+                            mergeSorter,
+                            forceKeepDelete);
+        }
+    }
 
-            List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
-            MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
-                    new 
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
-            for (List<SortedRun> section :
-                    new IntervalPartition(split.dataFiles(), 
keyComparator).partition()) {
-                sectionReaders.add(
-                        () ->
-                                MergeTreeReaders.readerForSection(
-                                        section,
-                                        section.size() > 1
-                                                ? overlappedSectionFactory
-                                                : nonOverlappedSectionFactory,
-                                        keyComparator,
-                                        mergeFuncWrapper,
-                                        mergeSorter));
-            }
+    private RecordReader<KeyValue> batchMergeRead(
+            BinaryRow partition, int bucket, List<DataFileMeta> files, boolean 
keepDelete)
+            throws IOException {
+        // Sections are read by SortMergeReader, which sorts and merges 
records by keys.
+        // So we cannot project keys or else the sorting will be incorrect.
+        KeyValueFileReaderFactory overlappedSectionFactory =
+                readerFactoryBuilder.build(partition, bucket, false, 
filtersForOverlappedSection);
+        KeyValueFileReaderFactory nonOverlappedSectionFactory =
+                readerFactoryBuilder.build(
+                        partition, bucket, false, 
filtersForNonOverlappedSection);
 
-            RecordReader<KeyValue> reader = 
ConcatRecordReader.create(sectionReaders);
-            if (!forceKeepDelete) {
-                reader = new DropDeleteReader(reader);
-            }
+        List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+        MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
+                new 
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
+        for (List<SortedRun> section : new IntervalPartition(files, 
keyComparator).partition()) {
+            sectionReaders.add(
+                    () ->
+                            MergeTreeReaders.readerForSection(
+                                    section,
+                                    section.size() > 1
+                                            ? overlappedSectionFactory
+                                            : nonOverlappedSectionFactory,
+                                    keyComparator,
+                                    mergeFuncWrapper,
+                                    mergeSorter));
+        }
 
-            // Project results from SortMergeReader using 
ProjectKeyRecordReader.
-            return keyProjectedFields == null ? reader : projectKey(reader, 
keyProjectedFields);
+        RecordReader<KeyValue> reader = 
ConcatRecordReader.create(sectionReaders);
+        if (!keepDelete) {
+            reader = new DropDeleteReader(reader);
         }
+
+        // Project results from SortMergeReader using ProjectKeyRecordReader.
+        return keyProjectedFields == null ? reader : projectKey(reader, 
keyProjectedFields);
     }
 
     private RecordReader<KeyValue> streamingConcat(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 374050af6..9f119363d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -34,6 +34,7 @@ import 
org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullStartingScanner;
 import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalTagStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -137,9 +138,14 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
                 checkArgument(!isStreaming, "Cannot read incremental in 
streaming mode.");
                 Pair<String, String> incrementalBetween = 
options.incrementalBetween();
                 if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) 
!= null) {
-                    return new IncrementalStartingScanner(
-                            Long.parseLong(incrementalBetween.getLeft()),
-                            Long.parseLong(incrementalBetween.getRight()));
+                    try {
+                        return new IncrementalStartingScanner(
+                                Long.parseLong(incrementalBetween.getLeft()),
+                                Long.parseLong(incrementalBetween.getRight()));
+                    } catch (NumberFormatException e) {
+                        return new IncrementalTagStartingScanner(
+                                incrementalBetween.getLeft(), 
incrementalBetween.getRight());
+                    }
                 } else {
                     return new IncrementalTimeStampStartingScanner(
                             Long.parseLong(incrementalBetween.getLeft()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
new file mode 100644
index 000000000..242db6b19
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+/** {@link StartingScanner} for incremental changes by tag. */
+public class IncrementalTagStartingScanner implements StartingScanner {
+
+    private final String start;
+    private final String end;
+
+    public IncrementalTagStartingScanner(String start, String end) {
+        this.start = start;
+        this.end = end;
+    }
+
+    @Override
+    public Result scan(SnapshotManager manager, SnapshotReader reader) {
+        TagManager tagManager = new TagManager(manager.fileIO(), 
manager.tablePath());
+        Snapshot tag1 = tagManager.taggedSnapshot(start);
+        Snapshot tag2 = tagManager.taggedSnapshot(end);
+
+        if (tag2.id() <= tag1.id()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Tag end %s with snapshot id %s should be larger 
than tag start %s with snapshot id %s",
+                            end, tag2.id(), start, tag1.id()));
+        }
+
+        return 
StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 3fb8f0bb5..ae547ed97 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -61,6 +61,8 @@ public interface SnapshotReader {
     /** Get splits plan from an overwritten snapshot. */
     Plan readOverwrittenChanges();
 
+    Plan readIncrementalDiff(Snapshot before);
+
     /** Get partitions from a snapshot. */
     List<BinaryRow> partitions();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 136ecd1ab..3c17bb51d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -239,13 +239,20 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                     "Cannot read overwrite splits from a non-overwrite 
snapshot.");
         }
 
-        List<DataSplit> splits = new ArrayList<>();
-
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
                 groupByPartFiles(plan.files(FileKind.DELETE));
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
 
+        return toChangesPlan(true, plan, beforeFiles, dataFiles);
+    }
+
+    private Plan toChangesPlan(
+            boolean isStreaming,
+            FileStoreScan.Plan plan,
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
+        List<DataSplit> splits = new ArrayList<>();
         Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
         beforeFiles.forEach(
                 (part, bucketMap) ->
@@ -267,14 +274,18 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                         dataFiles
                                 .getOrDefault(part, Collections.emptyMap())
                                 .getOrDefault(bucket, Collections.emptyList());
+
+                // deduplicate
+                before.removeIf(data::remove);
+
                 DataSplit split =
                         DataSplit.builder()
-                                .withSnapshot(snapshotId)
+                                .withSnapshot(plan.snapshotId())
                                 .withPartition(part)
                                 .withBucket(bucket)
                                 .withBeforeFiles(before)
                                 .withDataFiles(data)
-                                .isStreaming(true)
+                                .isStreaming(isStreaming)
                                 .build();
                 splits.add(split);
             }
@@ -300,6 +311,17 @@ public class SnapshotReaderImpl implements SnapshotReader {
         };
     }
 
+    @Override
+    public Plan readIncrementalDiff(Snapshot before) {
+        withKind(ScanKind.ALL);
+        FileStoreScan.Plan plan = scan.plan();
+        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+                groupByPartFiles(plan.files(FileKind.ADD));
+        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+                
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
+        return toChangesPlan(false, plan, beforeFiles, dataFiles);
+    }
+
     private RecordComparator partitionComparator() {
         if (lazyPartitionComparator == null) {
             lazyPartitionComparator =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 68f2a3b7a..5750bb822 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -242,6 +242,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return snapshotReader.readOverwrittenChanges();
         }
 
+        @Override
+        public Plan readIncrementalDiff(Snapshot before) {
+            return snapshotReader.readIncrementalDiff(before);
+        }
+
         @Override
         public List<BinaryRow> partitions() {
             return snapshotReader.partitions();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index b69b5c5e1..a7602bb72 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -35,6 +35,7 @@ import static 
org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link CoreOptions#INCREMENTAL_BETWEEN}. */
 public class IncrementalTableTest extends TableTestBase {
@@ -199,4 +200,94 @@ public class IncrementalTableTest extends TableTestBase {
                         GenericRow.of(fromString("-D"), 1, 2, 1),
                         GenericRow.of(fromString("-D"), 1, 3, 1));
     }
+
+    @Test
+    public void testTagIncremental() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+        Table auditLog = catalog.getTable(identifier("T$audit_log"));
+
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1));
+
+        // snapshot 2: append
+        write(
+                table,
+                // DELETE
+                GenericRow.ofKind(RowKind.DELETE, 1, 1, 1),
+                // UPDATE
+                GenericRow.of(1, 2, 2),
+                // NEW
+                GenericRow.of(1, 4, 1));
+
+        // snapshot 3: compact
+        compact(table, row(1), 0);
+
+        table.createTag("TAG1", 1);
+        table.createTag("TAG2", 2);
+        table.createTag("TAG3", 3);
+
+        // read tag1 tag2
+        List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN, 
"TAG1,TAG2"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), 
GenericRow.of(1, 4, 1));
+        result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(fromString("-D"), 1, 1, 1),
+                        GenericRow.of(fromString("+I"), 1, 2, 2),
+                        GenericRow.of(fromString("+I"), 1, 4, 1));
+
+        // read tag1 tag3
+        result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), 
GenericRow.of(1, 4, 1));
+        result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(fromString("-D"), 1, 1, 1),
+                        GenericRow.of(fromString("+I"), 1, 2, 2),
+                        GenericRow.of(fromString("+I"), 1, 4, 1));
+
+        assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, 
"TAG2,TAG1")))
+                .hasMessageContaining(
+                        "Tag end TAG1 with snapshot id 1 should be larger than 
tag start TAG2 with snapshot id 2");
+    }
+
+    @Test
+    public void testAppendTableTag() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+
+        write(table, GenericRow.of(1, 1, 1));
+        write(table, GenericRow.of(1, 1, 2));
+
+        table.createTag("TAG1", 1);
+        table.createTag("TAG2", 2);
+
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")))
+                .containsExactlyInAnyOrder(GenericRow.of(1, 1, 2));
+    }
 }

Reply via email to