This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9942d4265 [test][doc] Spark time travel with data frame test and doc
(#1497)
9942d4265 is described below
commit 9942d4265ba56429cb725e7fe57e1eb6cb44a1b1
Author: Liwei Li <[email protected]>
AuthorDate: Thu Jul 6 17:27:52 2023 +0800
[test][doc] Spark time travel with data frame test and doc (#1497)
---
docs/content/how-to/querying-tables.md | 33 ++++
.../spark/SparkTimeTravelWithDataFrameITCase.java | 177 +++++++++++++++++++++
2 files changed, 210 insertions(+)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 26cab1dde..952b5f4bf 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -53,6 +53,39 @@ SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
{{< tab "Spark3" >}}
+#### DataFrame
+
+To select a specific table snapshot or the snapshot at some time in the
DataFrame API, Paimon supports:
+
+* `scan.snapshot-id` selects a specific table snapshot
+* `scan.timestamp-millis` selects the current snapshot at a timestamp, in
milliseconds
+* `scan.tag-name` selects a specific table snapshot by tag name
+
+```scala
+// read the snapshot from specified timestamp in unix seconds
+spark.read
+ .option("scan.timestamp-millis", "1678883047000")
+ .format("paimon")
+ .load("path/to/table")
+```
+
+```scala
+// read the snapshot with id 1L (use snapshot id as version)
+spark.read
+ .option("scan.snapshot-id", 1)
+ .format("paimon")
+ .load("path/to/table")
+```
+
+```scala
+// read tag 'my-tag'
+spark.read
+ .option(CoreOptions.SCAN_TAG_NAME.key(), "my-tag")
+ .format("paimon")
+ .load("path/to/table")
+```
+
+#### SQL
Requires Spark 3.3+.
you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
new file mode 100644
index 000000000..b125504ce
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for Spark time travel with DataFrames. */
+public class SparkTimeTravelWithDataFrameITCase extends SparkReadTestBase {
+
+ @Test
+ public void testTravelToVersion() throws Exception {
+ // snapshot 2
+ writeTable(
+ "t1",
+ GenericRow.of(7, 2L, BinaryString.fromString("7")),
+ GenericRow.of(8, 4L, BinaryString.fromString("8")));
+
+ Dataset<Row> dataset =
+ spark.read()
+ .format("paimon")
+ .option("path", tablePath1.toString())
+ .option(CoreOptions.SCAN_SNAPSHOT_ID.key(), 1)
+ .load();
+ List<Row> results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+ dataset = spark.read().format("paimon").option("path",
tablePath1.toString()).load();
+ results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3], [7,2,7],
[8,4,8]]");
+ }
+
+ @Test
+ public void testTravelToTimestamp() throws Exception {
+ long anchor = System.currentTimeMillis();
+
+ // snapshot 2
+ writeTable(
+ "t1",
+ GenericRow.of(7, 2L, BinaryString.fromString("7")),
+ GenericRow.of(8, 4L, BinaryString.fromString("8")));
+
+ Dataset<Row> dataset =
+ spark.read()
+ .format("paimon")
+ .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
anchor)
+ .load(tablePath1.toString());
+ List<Row> results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+ dataset = spark.read().format("paimon").load(tablePath1.toString());
+ results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3], [7,2,7],
[8,4,8]]");
+ }
+
+ @Test
+ public void testTravelToOldSchema() throws Exception {
+ long anchor = System.currentTimeMillis();
+
+ // new schema
+ spark.sql("ALTER TABLE t1 ADD COLUMN dt STRING");
+
+ // snapshot 2
+ writeTable(
+ "t1",
+ GenericRow.of(7, 2L, BinaryString.fromString("7"),
BinaryString.fromString("7")),
+ GenericRow.of(8, 4L, BinaryString.fromString("8"),
BinaryString.fromString("8")));
+
+ Dataset<Row> dataset =
+ spark.read()
+ .format("paimon")
+ .option("path", tablePath1.toString())
+ .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
anchor)
+ .load();
+ List<Row> results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+ dataset = spark.read().format("paimon").option("path",
tablePath1.toString()).load();
+ results = dataset.collectAsList();
+ assertThat(results.toString())
+ .isEqualTo("[[1,2,1,null], [5,6,3,null], [7,2,7,7],
[8,4,8,8]]");
+ }
+
+ @Test
+ public void testTravelToNonExistedVersion() {
+ assertThatThrownBy(
+ () ->
+ spark.read()
+ .format("paimon")
+ .option("path", tablePath1.toString())
+
.option(CoreOptions.SCAN_SNAPSHOT_ID.key(), 3)
+ .load()
+ .collectAsList())
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ RuntimeException.class, "Fails to read
snapshot from path file"));
+ }
+
+ @Test
+ public void testTravelToNonExistedTimestamp() {
+ Dataset<Row> dataset =
+ spark.read()
+ .format("paimon")
+ .option("path", tablePath1.toString())
+ .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
+ .load();
+ assertThat(dataset.collectAsList().isEmpty());
+ }
+
+ @Test
+ public void testTravelToTag() throws Exception {
+ // snapshot 2
+ writeTable(
+ "t1",
+ GenericRow.of(7, 2L, BinaryString.fromString("7")),
+ GenericRow.of(8, 4L, BinaryString.fromString("8")));
+
+ getTable("t1").createTag("tag1", 1);
+
+ // read tag 'tag1'
+ Dataset<Row> dataset =
+ spark.read()
+ .format("paimon")
+ .option(CoreOptions.SCAN_TAG_NAME.key(), "tag1")
+ .load(tablePath1.toString());
+ List<Row> results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+ // read latest
+ dataset = spark.read().format("paimon").load(tablePath1.toString());
+ results = dataset.collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3], [7,2,7],
[8,4,8]]");
+ }
+
+ @Test
+ public void testIllegalVersion() {
+ assertThatThrownBy(
+ () ->
+ spark.read()
+ .format("paimon")
+ .option("path", tablePath1.toString())
+
.option(CoreOptions.SCAN_SNAPSHOT_ID.key(), 1.5)
+ .load()
+ .collectAsList())
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ IllegalArgumentException.class,
+ "Could not parse value '1.5' for key
'scan.snapshot-id'"));
+ }
+}