This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 457edbb46 [core] Introduce 'scan.version' to refactor spark time
travel (#2724)
457edbb46 is described below
commit 457edbb46b79130ae39a890c94feae39b82029fa
Author: yuzelin <[email protected]>
AuthorDate: Wed Jan 17 19:42:42 2024 +0800
[core] Introduce 'scan.version' to refactor spark time travel (#2724)
---
docs/content/how-to/querying-tables.md | 6 ++
docs/content/maintenance/manage-tags.md | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 16 +++++-
.../paimon/table/AbstractFileStoreTable.java | 65 +++++++++++++++-------
.../java/org/apache/paimon/spark/SparkCatalog.java | 15 +----
.../apache/paimon/spark/SparkTimeTravelITCase.java | 27 ++++++++-
6 files changed, 95 insertions(+), 36 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 6b2d8ac23..0c23df252 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -87,6 +87,12 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
SELECT * FROM t VERSION AS OF 'my-tag';
```
+{{< hint warning >}}
+If tag's name is a number and equals to a snapshot id, the VERSION AS OF
syntax will consider tag first. For example, if
+you have a tag named '1' based on snapshot 2, the statement `SELECT * FROM t
VERSION AS OF '1'` actually queries snapshot 2
+instead of snapshot 1.
+{{< /hint >}}
+
{{< /tab >}}
{{< tab "Spark3-DF" >}}
diff --git a/docs/content/maintenance/manage-tags.md
b/docs/content/maintenance/manage-tags.md
index cbc2d859a..667527b61 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -92,7 +92,7 @@ See [Query Tables]({{< ref "how-to/querying-tables" >}}) to
see more query for e
## Create Tags
-You can create a tag with given name (cannot be number) and snapshot ID.
+You can create a tag with given name and snapshot ID.
{{< tabs "create-tag" >}}
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 5d9e00a57..1c9af72ce 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -529,6 +529,15 @@ public class CoreOptions implements Serializable {
.withDescription(
"Optional tag name used in case of
\"from-snapshot\" scan mode.");
+ @ExcludeFromDocumentation("Internal use only")
+ public static final ConfigOption<String> SCAN_VERSION =
+ key("scan.version")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the time travel version string used in
'VERSION AS OF' syntax. "
+ + "We will use tag when both tag and
snapshot of that version exist.");
+
public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK =
key("scan.bounded.watermark")
.longType()
@@ -1334,7 +1343,8 @@ public class CoreOptions implements Serializable {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
return StartupMode.FROM_TIMESTAMP;
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
- || options.getOptional(SCAN_TAG_NAME).isPresent()) {
+ || options.getOptional(SCAN_TAG_NAME).isPresent()
+ || options.getOptional(SCAN_VERSION).isPresent()) {
return StartupMode.FROM_SNAPSHOT;
} else if
(options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
return StartupMode.FROM_FILE_CREATION_TIME;
@@ -1371,6 +1381,10 @@ public class CoreOptions implements Serializable {
return options.get(SCAN_TAG_NAME);
}
+ public String scanVersion() {
+ return options.get(SCAN_VERSION);
+ }
+
public Pair<String, String> incrementalBetween() {
String str = options.get(INCREMENTAL_BETWEEN);
if (str == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 3185de4c3..f9da880d2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -54,6 +54,8 @@ import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
@@ -212,14 +214,14 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
// copy a new table schema to contain dynamic options
TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
- // validate schema with new options
- SchemaValidation.validateTableSchema(newTableSchema);
-
if (tryTimeTravel) {
// see if merged options contain time travel option
newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
}
+ // validate schema with new options
+ SchemaValidation.validateTableSchema(newTableSchema);
+
return copy(newTableSchema);
}
@@ -314,35 +316,56 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
switch (coreOptions.startupMode()) {
case FROM_SNAPSHOT:
case FROM_SNAPSHOT_FULL:
- if (coreOptions.scanSnapshotId() != null) {
- long snapshotId = coreOptions.scanSnapshotId();
- if (snapshotManager().snapshotExists(snapshotId)) {
- long schemaId =
snapshotManager().snapshot(snapshotId).schemaId();
- return
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
- }
+ if (coreOptions.scanVersion() != null) {
+ return travelToVersion(coreOptions.scanVersion(), options);
+ } else if (coreOptions.scanSnapshotId() != null) {
+ return travelToSnapshot(coreOptions.scanSnapshotId(),
options);
} else {
- String tagName = coreOptions.scanTagName();
- TagManager tagManager = tagManager();
- if (tagManager.tagExists(tagName)) {
- long schemaId =
tagManager.taggedSnapshot(tagName).schemaId();
- return
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
- }
+ return travelToTag(coreOptions.scanTagName(), options);
}
- return Optional.empty();
case FROM_TIMESTAMP:
Snapshot snapshot =
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
snapshotManager(),
coreOptions.scanTimestampMills());
- if (snapshot != null) {
- long schemaId = snapshot.schemaId();
- return
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
- }
- return Optional.empty();
+ return travelToSnapshot(snapshot, options);
default:
return Optional.empty();
}
}
+ /** Tag first when travelling to a version. */
+ private Optional<TableSchema> travelToVersion(String version, Options
options) {
+ options.remove(CoreOptions.SCAN_VERSION.key());
+ if (tagManager().tagExists(version)) {
+ options.set(CoreOptions.SCAN_TAG_NAME, version);
+ return travelToTag(version, options);
+ } else if (version.chars().allMatch(Character::isDigit)) {
+ options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
+ return travelToSnapshot(Long.parseLong(version), options);
+ } else {
+ throw new RuntimeException("Cannot find a time travel version for
" + version);
+ }
+ }
+
+ private Optional<TableSchema> travelToTag(String tagName, Options options)
{
+ return travelToSnapshot(tagManager().taggedSnapshot(tagName), options);
+ }
+
+ private Optional<TableSchema> travelToSnapshot(long snapshotId, Options
options) {
+ SnapshotManager snapshotManager = snapshotManager();
+ if (snapshotManager.snapshotExists(snapshotId)) {
+ return travelToSnapshot(snapshotManager.snapshot(snapshotId),
options);
+ }
+ return Optional.empty();
+ }
+
+ private Optional<TableSchema> travelToSnapshot(@Nullable Snapshot
snapshot, Options options) {
+ if (snapshot != null) {
+ return
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+ }
+ return Optional.empty();
+ }
+
@Override
public void rollbackTo(long snapshotId) {
SnapshotManager snapshotManager = snapshotManager();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 438181360..359f04ff2 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -223,18 +223,9 @@ public class SparkCatalog extends SparkBaseCatalog {
*/
public SparkTable loadTable(Identifier ident, String version) throws
NoSuchTableException {
Table table = loadPaimonTable(ident);
- Options dynamicOptions = new Options();
-
- if (version.chars().allMatch(Character::isDigit)) {
- long snapshotId = Long.parseUnsignedLong(version);
- LOG.info("Time travel to snapshot '{}'.", snapshotId);
- dynamicOptions.set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId);
- } else {
- LOG.info("Time travel to tag '{}'.", version);
- dynamicOptions.set(CoreOptions.SCAN_TAG_NAME, version);
- }
-
- return new SparkTable(table.copy(dynamicOptions.toMap()));
+ LOG.info("Time travel to version '{}'.", version);
+ return new SparkTable(
+
table.copy(Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version)));
}
/**
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 20c87df89..ce9e72386 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -235,7 +235,8 @@ public class SparkTimeTravelITCase extends
SparkReadTestBase {
() -> spark.sql("SELECT * FROM t VERSION AS OF
'unknown'").collectAsList())
.satisfies(
AssertionUtils.anyCauseMatches(
- IllegalArgumentException.class, "Tag 'unknown'
doesn't exist."));
+ RuntimeException.class,
+ "Cannot find a time travel version for
unknown"));
}
@Test
@@ -274,4 +275,28 @@ public class SparkTimeTravelITCase extends
SparkReadTestBase {
assertThat(spark.sql("SELECT * FROM t VERSION AS OF
'tag2'").collectAsList().toString())
.isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
}
+
+ @Test
+ public void testTravelToTagWithDigitalName() throws Exception {
+ spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ writeTable(
+ "t",
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ // snapshot 2
+ writeTable(
+ "t",
+ GenericRow.of(3, BinaryString.fromString("Test")),
+ GenericRow.of(4, BinaryString.fromString("Case")));
+
+ FileStoreTable table = getTable("t");
+ table.createTag("1", 2);
+
+ // time travel to tag '1'
+ assertThat(spark.sql("SELECT * FROM t VERSION AS OF
'1'").collectAsList().toString())
+ .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+ }
}