This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 457edbb46 [core] Introduce 'scan.version' to refactor spark time 
travel (#2724)
457edbb46 is described below

commit 457edbb46b79130ae39a890c94feae39b82029fa
Author: yuzelin <[email protected]>
AuthorDate: Wed Jan 17 19:42:42 2024 +0800

    [core] Introduce 'scan.version' to refactor spark time travel (#2724)
---
 docs/content/how-to/querying-tables.md             |  6 ++
 docs/content/maintenance/manage-tags.md            |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   | 16 +++++-
 .../paimon/table/AbstractFileStoreTable.java       | 65 +++++++++++++++-------
 .../java/org/apache/paimon/spark/SparkCatalog.java | 15 +----
 .../apache/paimon/spark/SparkTimeTravelITCase.java | 27 ++++++++-
 6 files changed, 95 insertions(+), 36 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 6b2d8ac23..0c23df252 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -87,6 +87,12 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
 SELECT * FROM t VERSION AS OF 'my-tag';
 ```
 
+{{< hint warning >}}
+If tag's name is a number and equals to a snapshot id, the VERSION AS OF 
syntax will consider tag first. For example, if 
+you have a tag named '1' based on snapshot 2, the statement `SELECT * FROM t 
VERSION AS OF '1'` actually queries snapshot 2 
+instead of snapshot 1.
+{{< /hint >}}
+
 {{< /tab >}}
 
 {{< tab "Spark3-DF" >}}
diff --git a/docs/content/maintenance/manage-tags.md 
b/docs/content/maintenance/manage-tags.md
index cbc2d859a..667527b61 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -92,7 +92,7 @@ See [Query Tables]({{< ref "how-to/querying-tables" >}}) to 
see more query for e
 
 ## Create Tags
 
-You can create a tag with given name (cannot be number) and snapshot ID.
+You can create a tag with given name and snapshot ID.
 
 {{< tabs "create-tag" >}}
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 5d9e00a57..1c9af72ce 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -529,6 +529,15 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Optional tag name used in case of 
\"from-snapshot\" scan mode.");
 
+    @ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<String> SCAN_VERSION =
+            key("scan.version")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify the time travel version string used in 
'VERSION AS OF' syntax. "
+                                    + "We will use tag when both tag and 
snapshot of that version exist.");
+
     public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK =
             key("scan.bounded.watermark")
                     .longType()
@@ -1334,7 +1343,8 @@ public class CoreOptions implements Serializable {
             if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
                 return StartupMode.FROM_TIMESTAMP;
             } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
-                    || options.getOptional(SCAN_TAG_NAME).isPresent()) {
+                    || options.getOptional(SCAN_TAG_NAME).isPresent()
+                    || options.getOptional(SCAN_VERSION).isPresent()) {
                 return StartupMode.FROM_SNAPSHOT;
             } else if 
(options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
                 return StartupMode.FROM_FILE_CREATION_TIME;
@@ -1371,6 +1381,10 @@ public class CoreOptions implements Serializable {
         return options.get(SCAN_TAG_NAME);
     }
 
+    public String scanVersion() {
+        return options.get(SCAN_VERSION);
+    }
+
     public Pair<String, String> incrementalBetween() {
         String str = options.get(INCREMENTAL_BETWEEN);
         if (str == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 3185de4c3..f9da880d2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -54,6 +54,8 @@ import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
@@ -212,14 +214,14 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         // copy a new table schema to contain dynamic options
         TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
 
-        // validate schema with new options
-        SchemaValidation.validateTableSchema(newTableSchema);
-
         if (tryTimeTravel) {
             // see if merged options contain time travel option
             newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
         }
 
+        // validate schema with new options
+        SchemaValidation.validateTableSchema(newTableSchema);
+
         return copy(newTableSchema);
     }
 
@@ -314,35 +316,56 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         switch (coreOptions.startupMode()) {
             case FROM_SNAPSHOT:
             case FROM_SNAPSHOT_FULL:
-                if (coreOptions.scanSnapshotId() != null) {
-                    long snapshotId = coreOptions.scanSnapshotId();
-                    if (snapshotManager().snapshotExists(snapshotId)) {
-                        long schemaId = 
snapshotManager().snapshot(snapshotId).schemaId();
-                        return 
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
-                    }
+                if (coreOptions.scanVersion() != null) {
+                    return travelToVersion(coreOptions.scanVersion(), options);
+                } else if (coreOptions.scanSnapshotId() != null) {
+                    return travelToSnapshot(coreOptions.scanSnapshotId(), 
options);
                 } else {
-                    String tagName = coreOptions.scanTagName();
-                    TagManager tagManager = tagManager();
-                    if (tagManager.tagExists(tagName)) {
-                        long schemaId = 
tagManager.taggedSnapshot(tagName).schemaId();
-                        return 
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
-                    }
+                    return travelToTag(coreOptions.scanTagName(), options);
                 }
-                return Optional.empty();
             case FROM_TIMESTAMP:
                 Snapshot snapshot =
                         
StaticFromTimestampStartingScanner.timeTravelToTimestamp(
                                 snapshotManager(), 
coreOptions.scanTimestampMills());
-                if (snapshot != null) {
-                    long schemaId = snapshot.schemaId();
-                    return 
Optional.of(schemaManager().schema(schemaId).copy(options.toMap()));
-                }
-                return Optional.empty();
+                return travelToSnapshot(snapshot, options);
             default:
                 return Optional.empty();
         }
     }
 
+    /** Tag first when travelling to a version. */
+    private Optional<TableSchema> travelToVersion(String version, Options 
options) {
+        options.remove(CoreOptions.SCAN_VERSION.key());
+        if (tagManager().tagExists(version)) {
+            options.set(CoreOptions.SCAN_TAG_NAME, version);
+            return travelToTag(version, options);
+        } else if (version.chars().allMatch(Character::isDigit)) {
+            options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
+            return travelToSnapshot(Long.parseLong(version), options);
+        } else {
+            throw new RuntimeException("Cannot find a time travel version for 
" + version);
+        }
+    }
+
+    private Optional<TableSchema> travelToTag(String tagName, Options options) 
{
+        return travelToSnapshot(tagManager().taggedSnapshot(tagName), options);
+    }
+
+    private Optional<TableSchema> travelToSnapshot(long snapshotId, Options 
options) {
+        SnapshotManager snapshotManager = snapshotManager();
+        if (snapshotManager.snapshotExists(snapshotId)) {
+            return travelToSnapshot(snapshotManager.snapshot(snapshotId), 
options);
+        }
+        return Optional.empty();
+    }
+
+    private Optional<TableSchema> travelToSnapshot(@Nullable Snapshot 
snapshot, Options options) {
+        if (snapshot != null) {
+            return 
Optional.of(schemaManager().schema(snapshot.schemaId()).copy(options.toMap()));
+        }
+        return Optional.empty();
+    }
+
     @Override
     public void rollbackTo(long snapshotId) {
         SnapshotManager snapshotManager = snapshotManager();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 438181360..359f04ff2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -223,18 +223,9 @@ public class SparkCatalog extends SparkBaseCatalog {
      */
     public SparkTable loadTable(Identifier ident, String version) throws 
NoSuchTableException {
         Table table = loadPaimonTable(ident);
-        Options dynamicOptions = new Options();
-
-        if (version.chars().allMatch(Character::isDigit)) {
-            long snapshotId = Long.parseUnsignedLong(version);
-            LOG.info("Time travel to snapshot '{}'.", snapshotId);
-            dynamicOptions.set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId);
-        } else {
-            LOG.info("Time travel to tag '{}'.", version);
-            dynamicOptions.set(CoreOptions.SCAN_TAG_NAME, version);
-        }
-
-        return new SparkTable(table.copy(dynamicOptions.toMap()));
+        LOG.info("Time travel to version '{}'.", version);
+        return new SparkTable(
+                
table.copy(Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version)));
     }
 
     /**
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 20c87df89..ce9e72386 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -235,7 +235,8 @@ public class SparkTimeTravelITCase extends 
SparkReadTestBase {
                         () -> spark.sql("SELECT * FROM t VERSION AS OF 
'unknown'").collectAsList())
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
-                                IllegalArgumentException.class, "Tag 'unknown' 
doesn't exist."));
+                                RuntimeException.class,
+                                "Cannot find a time travel version for 
unknown"));
     }
 
     @Test
@@ -274,4 +275,28 @@ public class SparkTimeTravelITCase extends 
SparkReadTestBase {
         assertThat(spark.sql("SELECT * FROM t VERSION AS OF 
'tag2'").collectAsList().toString())
                 .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
     }
+
+    @Test
+    public void testTravelToTagWithDigitalName() throws Exception {
+        spark.sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        writeTable(
+                "t",
+                GenericRow.of(1, BinaryString.fromString("Hello")),
+                GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+        // snapshot 2
+        writeTable(
+                "t",
+                GenericRow.of(3, BinaryString.fromString("Test")),
+                GenericRow.of(4, BinaryString.fromString("Case")));
+
+        FileStoreTable table = getTable("t");
+        table.createTag("1", 2);
+
+        // time travel to tag '1'
+        assertThat(spark.sql("SELECT * FROM t VERSION AS OF 
'1'").collectAsList().toString())
+                .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
+    }
 }

Reply via email to