This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9942d4265 [test][doc] Spark time travel with data frame test and doc 
(#1497)
9942d4265 is described below

commit 9942d4265ba56429cb725e7fe57e1eb6cb44a1b1
Author: Liwei Li <[email protected]>
AuthorDate: Thu Jul 6 17:27:52 2023 +0800

    [test][doc] Spark time travel with data frame test and doc (#1497)
---
 docs/content/how-to/querying-tables.md             |  33 ++++
 .../spark/SparkTimeTravelWithDataFrameITCase.java  | 177 +++++++++++++++++++++
 2 files changed, 210 insertions(+)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 26cab1dde..952b5f4bf 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -53,6 +53,39 @@ SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
 
 {{< tab "Spark3" >}}
 
+#### DataFrame
+
+To select a specific table snapshot or the snapshot at some time in the 
DataFrame API, Paimon supports:
+
+* `scan.snapshot-id` selects a specific table snapshot
+* `scan.timestamp-millis` selects the current snapshot at a timestamp, in 
milliseconds
+* `scan.tag-name` selects a specific table snapshot by tag name
+
+```scala
+// read the snapshot from specified timestamp in unix seconds
+spark.read
+    .option("scan.timestamp-millis", "1678883047000")
+    .format("paimon")
+    .load("path/to/table")
+```
+
+```scala
+// read the snapshot with id 1L (use snapshot id as version)
+spark.read
+    .option("scan.snapshot-id", 1)
+    .format("paimon")
+    .load("path/to/table")
+```
+
+```scala
+// read tag 'my-tag'
+spark.read
+    .option(CoreOptions.SCAN_TAG_NAME.key(), "my-tag")
+    .format("paimon")
+    .load("path/to/table")
+```
+
+#### SQL
 Requires Spark 3.3+.
 
 you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
new file mode 100644
index 000000000..b125504ce
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for Spark time travel with DataFrames. */
+public class SparkTimeTravelWithDataFrameITCase extends SparkReadTestBase {
+
+    @Test
+    public void testTravelToVersion() throws Exception {
+        // snapshot 2
+        writeTable(
+                "t1",
+                GenericRow.of(7, 2L, BinaryString.fromString("7")),
+                GenericRow.of(8, 4L, BinaryString.fromString("8")));
+
+        Dataset<Row> dataset =
+                spark.read()
+                        .format("paimon")
+                        .option("path", tablePath1.toString())
+                        .option(CoreOptions.SCAN_SNAPSHOT_ID.key(), 1)
+                        .load();
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+        dataset = spark.read().format("paimon").option("path", 
tablePath1.toString()).load();
+        results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3], [7,2,7], 
[8,4,8]]");
+    }
+
+    @Test
+    public void testTravelToTimestamp() throws Exception {
+        long anchor = System.currentTimeMillis();
+
+        // snapshot 2
+        writeTable(
+                "t1",
+                GenericRow.of(7, 2L, BinaryString.fromString("7")),
+                GenericRow.of(8, 4L, BinaryString.fromString("8")));
+
+        Dataset<Row> dataset =
+                spark.read()
+                        .format("paimon")
+                        .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 
anchor)
+                        .load(tablePath1.toString());
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+        dataset = spark.read().format("paimon").load(tablePath1.toString());
+        results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3], [7,2,7], 
[8,4,8]]");
+    }
+
+    @Test
+    public void testTravelToOldSchema() throws Exception {
+        long anchor = System.currentTimeMillis();
+
+        // new schema
+        spark.sql("ALTER TABLE t1 ADD COLUMN dt STRING");
+
+        // snapshot 2
+        writeTable(
+                "t1",
+                GenericRow.of(7, 2L, BinaryString.fromString("7"), 
BinaryString.fromString("7")),
+                GenericRow.of(8, 4L, BinaryString.fromString("8"), 
BinaryString.fromString("8")));
+
+        Dataset<Row> dataset =
+                spark.read()
+                        .format("paimon")
+                        .option("path", tablePath1.toString())
+                        .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 
anchor)
+                        .load();
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+        dataset = spark.read().format("paimon").option("path", 
tablePath1.toString()).load();
+        results = dataset.collectAsList();
+        assertThat(results.toString())
+                .isEqualTo("[[1,2,1,null], [5,6,3,null], [7,2,7,7], 
[8,4,8,8]]");
+    }
+
+    @Test
+    public void testTravelToNonExistedVersion() {
+        assertThatThrownBy(
+                        () ->
+                                spark.read()
+                                        .format("paimon")
+                                        .option("path", tablePath1.toString())
+                                        
.option(CoreOptions.SCAN_SNAPSHOT_ID.key(), 3)
+                                        .load()
+                                        .collectAsList())
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                RuntimeException.class, "Fails to read 
snapshot from path file"));
+    }
+
+    @Test
+    public void testTravelToNonExistedTimestamp() {
+        Dataset<Row> dataset =
+                spark.read()
+                        .format("paimon")
+                        .option("path", tablePath1.toString())
+                        .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
+                        .load();
+        assertThat(dataset.collectAsList().isEmpty());
+    }
+
+    @Test
+    public void testTravelToTag() throws Exception {
+        // snapshot 2
+        writeTable(
+                "t1",
+                GenericRow.of(7, 2L, BinaryString.fromString("7")),
+                GenericRow.of(8, 4L, BinaryString.fromString("8")));
+
+        getTable("t1").createTag("tag1", 1);
+
+        // read tag 'tag1'
+        Dataset<Row> dataset =
+                spark.read()
+                        .format("paimon")
+                        .option(CoreOptions.SCAN_TAG_NAME.key(), "tag1")
+                        .load(tablePath1.toString());
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+        // read latest
+        dataset = spark.read().format("paimon").load(tablePath1.toString());
+        results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3], [7,2,7], 
[8,4,8]]");
+    }
+
+    @Test
+    public void testIllegalVersion() {
+        assertThatThrownBy(
+                        () ->
+                                spark.read()
+                                        .format("paimon")
+                                        .option("path", tablePath1.toString())
+                                        
.option(CoreOptions.SCAN_SNAPSHOT_ID.key(), 1.5)
+                                        .load()
+                                        .collectAsList())
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Could not parse value '1.5' for key 
'scan.snapshot-id'"));
+    }
+}

Reply via email to