This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 454040be7 [core] Support incremental read on tags (#1535)
454040be7 is described below
commit 454040be70034896184123fe508abec2c6274644
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 11 21:13:29 2023 +0800
[core] Support incremental read on tags (#1535)
---
docs/content/how-to/querying-tables.md | 4 +-
.../paimon/operation/AppendOnlyFileStoreRead.java | 10 +-
.../org/apache/paimon/operation/DiffReader.java | 137 +++++++++++++++++++++
.../paimon/operation/KeyValueFileStoreRead.java | 78 +++++++-----
.../table/source/AbstractInnerTableScan.java | 12 +-
.../snapshot/IncrementalTagStartingScanner.java | 51 ++++++++
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 30 ++++-
.../apache/paimon/table/system/AuditLogTable.java | 5 +
.../apache/paimon/table/IncrementalTableTest.java | 91 ++++++++++++++
10 files changed, 377 insertions(+), 43 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 5a454b70a..1b1fdeb13 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -117,7 +117,9 @@ SELECT * FROM t;
Read incremental changes between start snapshot (exclusive) and end snapshot.
-For example, '5,10' means changes between snapshot 5 and snapshot 10.
+For example:
+- '5,10' means changes between snapshot 5 and snapshot 10.
+- 'TAG1,TAG3' means changes between TAG1 and TAG3.
{{< tabs "incremental-example" >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index 2a90a0bac..1128d28bc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -39,6 +39,9 @@ import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Projection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -52,10 +55,11 @@ import static
org.apache.paimon.predicate.PredicateBuilder.splitAnd;
/** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */
public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AppendOnlyFileStoreRead.class);
+
private final FileIO fileIO;
private final SchemaManager schemaManager;
private final long schemaId;
- private final RowType rowType;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
@@ -74,7 +78,6 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
- this.rowType = rowType;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
@@ -98,6 +101,9 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(split.partition(),
split.bucket());
List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new
ArrayList<>();
+ if (split.beforeFiles().size() > 0) {
+ LOG.info("Ignore split before files: " + split.beforeFiles());
+ }
for (DataFileMeta file : split.dataFiles()) {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
BulkFormatMapping bulkFormatMapping =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
new file mode 100644
index 000000000..324bff935
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.MergeSorter;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/** A {@link RecordReader} util to read diff between before reader and after
reader. */
+public class DiffReader {
+
+ private static final int BEFORE_LEVEL = Integer.MIN_VALUE;
+ private static final int AFTER_LEVEL = Integer.MAX_VALUE;
+
+ public static RecordReader<KeyValue> readDiff(
+ RecordReader<KeyValue> beforeReader,
+ RecordReader<KeyValue> afterReader,
+ Comparator<InternalRow> keyComparator,
+ MergeSorter sorter,
+ boolean keepDelete)
+ throws IOException {
+ return sorter.mergeSort(
+ Arrays.asList(
+ () -> wrapLevelToReader(beforeReader, BEFORE_LEVEL),
+ () -> wrapLevelToReader(afterReader, AFTER_LEVEL)),
+ keyComparator,
+ new DiffMerger(keepDelete));
+ }
+
+ private static RecordReader<KeyValue> wrapLevelToReader(
+ RecordReader<KeyValue> reader, int level) {
+ return new RecordReader<KeyValue>() {
+ @Nullable
+ @Override
+ public RecordIterator<KeyValue> readBatch() throws IOException {
+ RecordIterator<KeyValue> batch = reader.readBatch();
+ if (batch == null) {
+ return null;
+ }
+
+ return new RecordIterator<KeyValue>() {
+ @Nullable
+ @Override
+ public KeyValue next() throws IOException {
+ KeyValue kv = batch.next();
+ if (kv != null) {
+ kv.setLevel(level);
+ }
+ return kv;
+ }
+
+ @Override
+ public void releaseBatch() {
+ batch.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ };
+ }
+
+ private static class DiffMerger implements MergeFunctionWrapper<KeyValue> {
+
+ private final boolean keepDelete;
+
+ private final List<KeyValue> kvs = new ArrayList<>();
+
+ public DiffMerger(boolean keepDelete) {
+ this.keepDelete = keepDelete;
+ }
+
+ @Override
+ public void reset() {
+ this.kvs.clear();
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ this.kvs.add(kv);
+ }
+
+ @Nullable
+ @Override
+ public KeyValue getResult() {
+ if (kvs.size() == 1) {
+ KeyValue kv = kvs.get(0);
+ if (kv.level() == BEFORE_LEVEL) {
+ if (keepDelete) {
+ return kv.replaceValueKind(RowKind.DELETE);
+ }
+ } else {
+ return kv;
+ }
+ } else if (kvs.size() == 2) {
+ KeyValue latest = kvs.get(1);
+ if (latest.level() == AFTER_LEVEL) {
+ return latest;
+ }
+ } else {
+ throw new IllegalArgumentException("Illegal kv number: " +
kvs.size());
+ }
+
+ return null;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index b3aa93d53..3a666e709 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
@@ -199,43 +200,54 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
? dataSupplier.get()
: ConcatRecordReader.create(Arrays.asList(beforeSupplier,
dataSupplier));
} else {
- // Sections are read by SortMergeReader, which sorts and merges
records by keys.
- // So we cannot project keys or else the sorting will be incorrect.
- KeyValueFileReaderFactory overlappedSectionFactory =
- readerFactoryBuilder.build(
- split.partition(), split.bucket(), false,
filtersForOverlappedSection);
- KeyValueFileReaderFactory nonOverlappedSectionFactory =
- readerFactoryBuilder.build(
- split.partition(),
- split.bucket(),
- false,
- filtersForNonOverlappedSection);
+ return split.beforeFiles().isEmpty()
+ ? batchMergeRead(
+ split.partition(), split.bucket(),
split.dataFiles(), forceKeepDelete)
+ : DiffReader.readDiff(
+ batchMergeRead(
+ split.partition(), split.bucket(),
split.beforeFiles(), false),
+ batchMergeRead(
+ split.partition(), split.bucket(),
split.dataFiles(), false),
+ keyComparator,
+ mergeSorter,
+ forceKeepDelete);
+ }
+ }
- List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
- MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
- new
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
- for (List<SortedRun> section :
- new IntervalPartition(split.dataFiles(),
keyComparator).partition()) {
- sectionReaders.add(
- () ->
- MergeTreeReaders.readerForSection(
- section,
- section.size() > 1
- ? overlappedSectionFactory
- : nonOverlappedSectionFactory,
- keyComparator,
- mergeFuncWrapper,
- mergeSorter));
- }
+ private RecordReader<KeyValue> batchMergeRead(
+ BinaryRow partition, int bucket, List<DataFileMeta> files, boolean
keepDelete)
+ throws IOException {
+ // Sections are read by SortMergeReader, which sorts and merges
records by keys.
+ // So we cannot project keys or else the sorting will be incorrect.
+ KeyValueFileReaderFactory overlappedSectionFactory =
+ readerFactoryBuilder.build(partition, bucket, false,
filtersForOverlappedSection);
+ KeyValueFileReaderFactory nonOverlappedSectionFactory =
+ readerFactoryBuilder.build(
+ partition, bucket, false,
filtersForNonOverlappedSection);
- RecordReader<KeyValue> reader =
ConcatRecordReader.create(sectionReaders);
- if (!forceKeepDelete) {
- reader = new DropDeleteReader(reader);
- }
+ List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+ MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
+ new
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
+ for (List<SortedRun> section : new IntervalPartition(files,
keyComparator).partition()) {
+ sectionReaders.add(
+ () ->
+ MergeTreeReaders.readerForSection(
+ section,
+ section.size() > 1
+ ? overlappedSectionFactory
+ : nonOverlappedSectionFactory,
+ keyComparator,
+ mergeFuncWrapper,
+ mergeSorter));
+ }
- // Project results from SortMergeReader using
ProjectKeyRecordReader.
- return keyProjectedFields == null ? reader : projectKey(reader,
keyProjectedFields);
+ RecordReader<KeyValue> reader =
ConcatRecordReader.create(sectionReaders);
+ if (!keepDelete) {
+ reader = new DropDeleteReader(reader);
}
+
+ // Project results from SortMergeReader using ProjectKeyRecordReader.
+ return keyProjectedFields == null ? reader : projectKey(reader,
keyProjectedFields);
}
private RecordReader<KeyValue> streamingConcat(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 374050af6..9f119363d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -34,6 +34,7 @@ import
org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalTagStartingScanner;
import
org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -137,9 +138,14 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
checkArgument(!isStreaming, "Cannot read incremental in
streaming mode.");
Pair<String, String> incrementalBetween =
options.incrementalBetween();
if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key())
!= null) {
- return new IncrementalStartingScanner(
- Long.parseLong(incrementalBetween.getLeft()),
- Long.parseLong(incrementalBetween.getRight()));
+ try {
+ return new IncrementalStartingScanner(
+ Long.parseLong(incrementalBetween.getLeft()),
+ Long.parseLong(incrementalBetween.getRight()));
+ } catch (NumberFormatException e) {
+ return new IncrementalTagStartingScanner(
+ incrementalBetween.getLeft(),
incrementalBetween.getRight());
+ }
} else {
return new IncrementalTimeStampStartingScanner(
Long.parseLong(incrementalBetween.getLeft()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
new file mode 100644
index 000000000..242db6b19
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+/** {@link StartingScanner} for incremental changes by tag. */
+public class IncrementalTagStartingScanner implements StartingScanner {
+
+ private final String start;
+ private final String end;
+
+ public IncrementalTagStartingScanner(String start, String end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public Result scan(SnapshotManager manager, SnapshotReader reader) {
+ TagManager tagManager = new TagManager(manager.fileIO(),
manager.tablePath());
+ Snapshot tag1 = tagManager.taggedSnapshot(start);
+ Snapshot tag2 = tagManager.taggedSnapshot(end);
+
+ if (tag2.id() <= tag1.id()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Tag end %s with snapshot id %s should be larger
than tag start %s with snapshot id %s",
+ end, tag2.id(), start, tag1.id()));
+ }
+
+ return
StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 3fb8f0bb5..ae547ed97 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -61,6 +61,8 @@ public interface SnapshotReader {
/** Get splits plan from an overwritten snapshot. */
Plan readOverwrittenChanges();
+ Plan readIncrementalDiff(Snapshot before);
+
/** Get partitions from a snapshot. */
List<BinaryRow> partitions();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 136ecd1ab..3c17bb51d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -239,13 +239,20 @@ public class SnapshotReaderImpl implements SnapshotReader
{
"Cannot read overwrite splits from a non-overwrite
snapshot.");
}
- List<DataSplit> splits = new ArrayList<>();
-
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
groupByPartFiles(plan.files(FileKind.DELETE));
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));
+ return toChangesPlan(true, plan, beforeFiles, dataFiles);
+ }
+
+ private Plan toChangesPlan(
+ boolean isStreaming,
+ FileStoreScan.Plan plan,
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
+ List<DataSplit> splits = new ArrayList<>();
Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
beforeFiles.forEach(
(part, bucketMap) ->
@@ -267,14 +274,18 @@ public class SnapshotReaderImpl implements SnapshotReader
{
dataFiles
.getOrDefault(part, Collections.emptyMap())
.getOrDefault(bucket, Collections.emptyList());
+
+ // deduplicate
+ before.removeIf(data::remove);
+
DataSplit split =
DataSplit.builder()
- .withSnapshot(snapshotId)
+ .withSnapshot(plan.snapshotId())
.withPartition(part)
.withBucket(bucket)
.withBeforeFiles(before)
.withDataFiles(data)
- .isStreaming(true)
+ .isStreaming(isStreaming)
.build();
splits.add(split);
}
@@ -300,6 +311,17 @@ public class SnapshotReaderImpl implements SnapshotReader {
};
}
+ @Override
+ public Plan readIncrementalDiff(Snapshot before) {
+ withKind(ScanKind.ALL);
+ FileStoreScan.Plan plan = scan.plan();
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
+ groupByPartFiles(plan.files(FileKind.ADD));
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
+
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
+ return toChangesPlan(false, plan, beforeFiles, dataFiles);
+ }
+
private RecordComparator partitionComparator() {
if (lazyPartitionComparator == null) {
lazyPartitionComparator =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 68f2a3b7a..5750bb822 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -242,6 +242,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return snapshotReader.readOverwrittenChanges();
}
+ @Override
+ public Plan readIncrementalDiff(Snapshot before) {
+ return snapshotReader.readIncrementalDiff(before);
+ }
+
@Override
public List<BinaryRow> partitions() {
return snapshotReader.partitions();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index b69b5c5e1..a7602bb72 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -35,6 +35,7 @@ import static
org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link CoreOptions#INCREMENTAL_BETWEEN}. */
public class IncrementalTableTest extends TableTestBase {
@@ -199,4 +200,94 @@ public class IncrementalTableTest extends TableTestBase {
GenericRow.of(fromString("-D"), 1, 2, 1),
GenericRow.of(fromString("-D"), 1, 3, 1));
}
+
+ @Test
+ public void testTagIncremental() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Table auditLog = catalog.getTable(identifier("T$audit_log"));
+
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1));
+
+ // snapshot 2: append
+ write(
+ table,
+ // DELETE
+ GenericRow.ofKind(RowKind.DELETE, 1, 1, 1),
+ // UPDATE
+ GenericRow.of(1, 2, 2),
+ // NEW
+ GenericRow.of(1, 4, 1));
+
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+
+ table.createTag("TAG1", 1);
+ table.createTag("TAG2", 2);
+ table.createTag("TAG3", 3);
+
+ // read tag1 tag2
+ List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN,
"TAG1,TAG2"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2),
GenericRow.of(1, 4, 1));
+ result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(fromString("-D"), 1, 1, 1),
+ GenericRow.of(fromString("+I"), 1, 2, 2),
+ GenericRow.of(fromString("+I"), 1, 4, 1));
+
+ // read tag1 tag3
+ result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2),
GenericRow.of(1, 4, 1));
+ result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(fromString("-D"), 1, 1, 1),
+ GenericRow.of(fromString("+I"), 1, 2, 2),
+ GenericRow.of(fromString("+I"), 1, 4, 1));
+
+ assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN,
"TAG2,TAG1")))
+ .hasMessageContaining(
+ "Tag end TAG1 with snapshot id 1 should be larger than
tag start TAG2 with snapshot id 2");
+ }
+
+ @Test
+ public void testAppendTableTag() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+
+ write(table, GenericRow.of(1, 1, 1));
+ write(table, GenericRow.of(1, 1, 2));
+
+ table.createTag("TAG1", 1);
+ table.createTag("TAG2", 2);
+
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")))
+ .containsExactlyInAnyOrder(GenericRow.of(1, 1, 2));
+ }
}