This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 71918811d [core] Introduce incremental-between read by timestamp
(#1470)
71918811d is described below
commit 71918811d0014e2948f5cd4382f55f52c256e46f
Author: pongandnoon <[email protected]>
AuthorDate: Thu Jul 6 11:01:52 2023 +0800
[core] Introduce incremental-between read by timestamp (#1470)
---
.../shortcodes/generated/core_configuration.html | 8 +-
.../main/java/org/apache/paimon/CoreOptions.java | 26 ++-
.../org/apache/paimon/schema/SchemaValidation.java | 28 ++-
.../table/source/AbstractInnerTableScan.java | 15 +-
.../snapshot/IncrementalStartingScanner.java | 11 +-
.../IncrementalTimeStampStartingScanner.java | 52 +++++
.../table/IncrementalTimeStampTableTest.java | 220 +++++++++++++++++++++
7 files changed, 341 insertions(+), 19 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6cb820349..9df8991c8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -164,6 +164,12 @@ under the License.
<td>String</td>
<td>Read incremental changes between start snapshot (exclusive)
and end snapshot, for example, '5,10' means changes between snapshot 5 and
snapshot 10.</td>
</tr>
+ <tr>
+ <td><h5>incremental-between-timestamp</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Read incremental changes between start timestamp (exclusive)
and end timestamp, for example, 't1,t2' means changes between timestamp t1 and
timestamp t2.</td>
+ </tr>
<tr>
<td><h5>local-sort.max-num-file-handles</h5></td>
<td style="word-wrap: break-word;">128</td>
@@ -366,7 +372,7 @@ under the License.
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
+ <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 0a1db378f..13325d536 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -697,6 +697,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Read incremental changes between start snapshot
(exclusive) and end snapshot, "
+ "for example, '5,10' means changes
between snapshot 5 and snapshot 10.");
+ public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
+ key("incremental-between-timestamp")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Read incremental changes between start timestamp
(exclusive) and end timestamp, "
+ + "for example, 't1,t2' means changes
between timestamp t1 and timestamp t2.");
public static final String STATS_MODE_SUFFIX = "stats-mode";
@@ -965,7 +972,8 @@ public class CoreOptions implements Serializable {
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
|| options.getOptional(SCAN_TAG_NAME).isPresent()) {
return StartupMode.FROM_SNAPSHOT;
- } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()) {
+ } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()
+ ||
options.getOptional(INCREMENTAL_BETWEEN_TIMESTAMP).isPresent()) {
return StartupMode.INCREMENTAL;
} else {
return StartupMode.LATEST_FULL;
@@ -996,14 +1004,17 @@ public class CoreOptions implements Serializable {
public Pair<String, String> incrementalBetween() {
String str = options.get(INCREMENTAL_BETWEEN);
if (str == null) {
- return null;
+ str = options.get(INCREMENTAL_BETWEEN_TIMESTAMP);
+ if (str == null) {
+ return null;
+ }
}
String[] split = str.split(",");
if (split.length != 2) {
throw new IllegalArgumentException(
- "The incremental-between must specific start snapshot
(exclusive) and end snapshot,"
- + " for example, '5,10' means changes between
snapshot 5 and snapshot 10. But is: "
+ "The incremental-between or incremental-between-timestamp
must specific start(exclusive) and end snapshot or timestamp,"
+ + " for example, 'incremental-between'='5,10'
means changes between snapshot 5 and snapshot 10. But is: "
+ str);
}
return Pair.of(split[0], split[1]);
@@ -1192,7 +1203,8 @@ public class CoreOptions implements Serializable {
+ "produces a snapshot specified by
\"scan.snapshot-id\" but does not read new changes."),
INCREMENTAL(
- "incremental", "Read incremental changes between start
snapshot and end snapshot.");
+ "incremental",
+ "Read incremental changes between start and end snapshot or
timestamp.");
private final String value;
private final String description;
@@ -1449,7 +1461,9 @@ public class CoreOptions implements Serializable {
options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
}
- if (options.contains(INCREMENTAL_BETWEEN) &&
!options.contains(SCAN_MODE)) {
+ if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP)
+ || options.contains(INCREMENTAL_BETWEEN))
+ && !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.INCREMENTAL);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index fee830427..044f6b1f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
@@ -191,31 +192,48 @@ public class SchemaValidation {
options, SCAN_TIMESTAMP_MILLIS,
CoreOptions.StartupMode.FROM_TIMESTAMP);
checkOptionsConflict(
options,
- Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME,
INCREMENTAL_BETWEEN),
+ Arrays.asList(
+ SCAN_SNAPSHOT_ID,
+ SCAN_TAG_NAME,
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ INCREMENTAL_BETWEEN),
Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
} else if (options.startupMode() ==
CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
options, options.startupMode(), SCAN_SNAPSHOT_ID,
SCAN_TAG_NAME);
checkOptionsConflict(
options,
- Arrays.asList(SCAN_TIMESTAMP_MILLIS, INCREMENTAL_BETWEEN),
+ Arrays.asList(
+ SCAN_TIMESTAMP_MILLIS,
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ INCREMENTAL_BETWEEN),
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
} else if (options.startupMode() ==
CoreOptions.StartupMode.INCREMENTAL) {
- checkOptionExistInMode(options, INCREMENTAL_BETWEEN,
options.startupMode());
+ checkExactOneOptionExistInMode(
+ options,
+ options.startupMode(),
+ INCREMENTAL_BETWEEN,
+ INCREMENTAL_BETWEEN_TIMESTAMP);
checkOptionsConflict(
options,
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS,
SCAN_TAG_NAME),
- Collections.singletonList(INCREMENTAL_BETWEEN));
+ Arrays.asList(INCREMENTAL_BETWEEN,
INCREMENTAL_BETWEEN_TIMESTAMP));
} else if (options.startupMode() ==
CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID,
options.startupMode());
checkOptionsConflict(
options,
- Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME,
INCREMENTAL_BETWEEN),
+ Arrays.asList(
+ SCAN_TIMESTAMP_MILLIS,
+ SCAN_TAG_NAME,
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ INCREMENTAL_BETWEEN),
Collections.singletonList(SCAN_SNAPSHOT_ID));
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS,
options.startupMode());
checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID,
options.startupMode());
checkOptionNotExistInMode(options, SCAN_TAG_NAME,
options.startupMode());
+ checkOptionNotExistInMode(
+ options, INCREMENTAL_BETWEEN_TIMESTAMP,
options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN,
options.startupMode());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 3a3d7ee62..374050af6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -34,6 +34,7 @@ import
org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
+import
org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
@@ -134,10 +135,16 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
: new
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in
streaming mode.");
- Pair<String, String> incremental =
options.incrementalBetween();
- return new IncrementalStartingScanner(
- Long.parseLong(incremental.getLeft()),
- Long.parseLong(incremental.getRight()));
+ Pair<String, String> incrementalBetween =
options.incrementalBetween();
+ if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key())
!= null) {
+ return new IncrementalStartingScanner(
+ Long.parseLong(incrementalBetween.getLeft()),
+ Long.parseLong(incrementalBetween.getRight()));
+ } else {
+ return new IncrementalTimeStampStartingScanner(
+ Long.parseLong(incrementalBetween.getLeft()),
+ Long.parseLong(incrementalBetween.getRight()));
+ }
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 74972f2ab..40d40c830 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -33,11 +33,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-/** {@link StartingScanner} for incremental changes. */
+/** {@link StartingScanner} for incremental changes by snapshot. */
public class IncrementalStartingScanner implements StartingScanner {
- private final long start;
- private final long end;
+ private long start;
+ private long end;
public IncrementalStartingScanner(long start, long end) {
this.start = start;
@@ -46,6 +46,11 @@ public class IncrementalStartingScanner implements
StartingScanner {
@Override
public Result scan(SnapshotManager manager, SnapshotReader reader) {
+ long earliestSnapshotId = manager.earliestSnapshotId();
+ long latestSnapshotId = manager.latestSnapshotId();
+ start = (start < earliestSnapshotId) ? earliestSnapshotId - 1 : start;
+ end = (end > latestSnapshotId) ? latestSnapshotId : end;
+
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
HashMap<>();
for (long i = start + 1; i < end + 1; i++) {
List<DataSplit> splits = readDeltaSplits(reader,
manager.snapshot(i));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
new file mode 100644
index 000000000..ed8613a26
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+
+/** {@link StartingScanner} for incremental changes by timestamp. */
+public class IncrementalTimeStampStartingScanner implements StartingScanner {
+
+ private final long startTimestamp;
+ private final long endTimestamp;
+
+ public IncrementalTimeStampStartingScanner(long startTimestamp, long
endTimestamp) {
+ this.startTimestamp = startTimestamp;
+ this.endTimestamp = endTimestamp;
+ }
+
+ @Override
+ public Result scan(SnapshotManager manager, SnapshotReader reader) {
+ Snapshot earliestSnapshot =
manager.snapshot(manager.earliestSnapshotId());
+ Snapshot latestSnapshot = manager.latestSnapshot();
+ if (startTimestamp > latestSnapshot.timeMillis()
+ || endTimestamp < earliestSnapshot.timeMillis()) {
+ return new NoSnapshot();
+ }
+ Snapshot startSnapshot =
manager.earlierOrEqualTimeMills(startTimestamp);
+ Long startSnapshotId =
+ (startSnapshot == null) ? earliestSnapshot.id() - 1 :
startSnapshot.id();
+ Snapshot endSnapshot = manager.earlierOrEqualTimeMills(endTimestamp);
+ Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() :
endSnapshot.id();
+ IncrementalStartingScanner incrementalStartingScanner =
+ new IncrementalStartingScanner(startSnapshotId, endSnapshotId);
+ return incrementalStartingScanner.scan(manager, reader);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
new file mode 100644
index 000000000..56486ea8a
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoreOptions#INCREMENTAL_BETWEEN_TIMESTAMP}. */
+public class IncrementalTimeStampTableTest extends TableTestBase {
+
+ @Test
+ public void testPrimaryKeyTable() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+
+ Long timestampEarliest = System.currentTimeMillis();
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1));
+
+ // snapshot 2: append
+ write(
+ table,
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 2));
+ Long timestampSnapshot2 = snapshotManager.snapshot(2).timeMillis();
+
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+
+ // snapshot 4: append
+ write(
+ table,
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1));
+
+ // snapshot 5: append
+ write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4),
GenericRow.of(2, 1, 4));
+ Long timestampSnapshot4 = snapshotManager.snapshot(5).timeMillis();
+
+ // snapshot 6: append
+ write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5),
GenericRow.of(2, 1, 5));
+
+ List<InternalRow> result1 =
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format("%s,%s", timestampEarliest - 1,
timestampEarliest)));
+ assertThat(result1).isEmpty();
+
+ List<InternalRow> result2 =
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format("%s,%s", timestampEarliest,
timestampSnapshot2)));
+ assertThat(result2)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 2));
+
+ List<InternalRow> result3 =
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format("%s,%s", timestampSnapshot2,
timestampSnapshot4)));
+ assertThat(result3)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 4),
+ GenericRow.of(1, 2, 4),
+ GenericRow.of(2, 1, 4),
+ GenericRow.of(2, 2, 1));
+ }
+
+ @Test
+ public void testAppendTable() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+ Long timestampEarliest = System.currentTimeMillis();
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1));
+
+ // snapshot 2: append
+ write(
+ table,
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 2));
+ Long timestampSnapshot2 = snapshotManager.snapshot(2).timeMillis();
+
+ // snapshot 3: append
+ write(
+ table,
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1));
+
+ // snapshot 4: append
+ write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4),
GenericRow.of(2, 1, 4));
+
+ // snapshot 5: append
+ write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5),
GenericRow.of(2, 1, 5));
+
+ Long timestampSnapshot4 = snapshotManager.snapshot(4).timeMillis();
+
+ List<InternalRow> result1 =
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format("%s,%s", timestampEarliest - 1,
timestampEarliest)));
+ assertThat(result1).isEmpty();
+
+ List<InternalRow> result2 =
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format("%s,%s", timestampEarliest,
timestampSnapshot2)));
+ assertThat(result2)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 1),
+ GenericRow.of(2, 1, 2));
+
+ List<InternalRow> result3 =
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format("%s,%s", timestampSnapshot2,
timestampSnapshot4)));
+ assertThat(result3)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1),
+ GenericRow.of(1, 1, 4),
+ GenericRow.of(1, 2, 4),
+ GenericRow.of(2, 1, 4));
+ }
+}