This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4befb7134 [flink] support time travel for flink (#2194)
4befb7134 is described below

commit 4befb71344e532f594fd8b46cd2e71409c97452c
Author: JunZhang <[email protected]>
AuthorDate: Thu Nov 2 13:55:32 2023 +0800

    [flink] support time travel for flink (#2194)
---
 docs/content/how-to/querying-tables.md             |  27 +++-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  19 +++
 .../org/apache/paimon/flink/TimeTravelITCase.java  | 166 +++++++++++++++++++++
 .../java/org/apache/paimon/spark/SparkCatalog.java |  16 +-
 .../apache/paimon/spark/SparkTimeTravelITCase.java |  35 ++++-
 5 files changed, 242 insertions(+), 21 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 5064302fe..e2b9fc912 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -43,7 +43,7 @@ Paimon batch reads with time travel can specify a snapshot or 
a tag and read the
 
 {{< tabs "time-travel-example" >}}
 
-{{< tab "Flink" >}}
+{{< tab "Flink (dynamic option)" >}}
 ```sql
 -- read the snapshot with id 1L
 SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
@@ -56,6 +56,17 @@ SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
 ```
 {{< /tab >}}
 
+{{< tab "Flink 1.18+" >}}
+Flink SQL supports time travel syntax after 1.18.
+```sql
+-- read the snapshot from specified timestamp
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
+
+-- you can also use some simple expressions (see flink document to get 
supported functions)
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + 
INTERVAL '1' DAY
+```
+{{< /tab >}}
+
 {{< tab "Spark3" >}}
 
 Requires Spark 3.3+.
@@ -274,7 +285,7 @@ SELECT * FROM t WHERE dt > '2023-06-26';
 If it's not a partitioned table, or you can't filter by partition, you can use 
Time travel's stream read.
 
 {{< tabs "streaming-time-travel" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink (dynamic option)" >}}
 ```sql
 -- read changes from snapshot id 1L 
 SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
@@ -286,6 +297,18 @@ SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = 
'1678883047356') */;
 SELECT * FROM t /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
 ```
 {{< /tab >}}
+
+{{< tab "Flink 1.18+" >}}
+Flink SQL supports time travel syntax after 1.18.
+```sql
+-- read the snapshot from specified timestamp
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
+
+-- you can also use some simple expressions (see flink document to get 
supported functions)
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + 
INTERVAL '1' DAY
+```
+{{< /tab >}}
+
 {{< /tabs >}}
 
 ### Consumer ID
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 260b90a5f..8606f8441 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -227,6 +227,19 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public CatalogTable getTable(ObjectPath tablePath)
             throws TableNotExistException, CatalogException {
+        return getTable(tablePath, null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.17-.
+     */
+    public CatalogTable getTable(ObjectPath tablePath, long timestamp)
+            throws TableNotExistException, CatalogException {
+        return getTable(tablePath, Long.valueOf(timestamp));
+    }
+
+    private CatalogTable getTable(ObjectPath tablePath, @Nullable Long 
timestamp)
+            throws TableNotExistException {
         Table table;
         try {
             table = catalog.getTable(toIdentifier(tablePath));
@@ -234,6 +247,12 @@ public class FlinkCatalog extends AbstractCatalog {
             throw new TableNotExistException(getName(), tablePath);
         }
 
+        if (timestamp != null) {
+            Options options = new Options();
+            options.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
+            table = table.copy(options.toMap());
+        }
+
         if (table instanceof FileStoreTable) {
             return toCatalogTable(table);
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
new file mode 100644
index 000000000..8cbc4d5b1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for flink time travel. */
+public class TimeTravelITCase extends CatalogITCaseBase {
+
+    // -------------------------------------------------------
+    //                          Batch
+    // -------------------------------------------------------
+
+    @Test
+    public void testTravelToTimestampString() throws Exception {
+        sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+        Thread.sleep(3000);
+        String anchor = now();
+        // snapshot 2
+        sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')");
+
+        List<Row> result = sql("SELECT * FROM t");
+        assertThat(result.toString())
+                .isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2, 
paimon]]");
+
+        // time travel to snapshot 1
+        result = sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", 
anchor);
+        assertThat(result.toString()).isEqualTo("[+I[1, hello], +I[2, 
world]]");
+    }
+
+    @Test
+    public void testExpression() throws Exception {
+        sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+
+        String anchor = now();
+        Thread.sleep(3000);
+
+        // snapshot 2
+        sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')");
+
+        List<Row> result = sql("SELECT * FROM t");
+        assertThat(result.toString())
+                .isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2, 
paimon]]");
+
+        // time travel to snapshot 1
+        result =
+                sql(
+                        "SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s' 
+ INTERVAL '1' SECOND",
+                        anchor);
+        assertThat(result.toString()).isEqualTo("[+I[1, hello], +I[2, 
world]]");
+    }
+
+    @Test
+    public void testTravelToOldSchema() throws Exception {
+        // old schema
+        sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+
+        Thread.sleep(3000);
+        String anchor = now();
+
+        // new schema
+        sql("ALTER TABLE t ADD dt STRING");
+
+        // snapshot 2
+        sql("INSERT INTO t VALUES(1, 'flink', '2020-01-01'), (2, 'paimon', 
'2020-01-02')");
+
+        List<Row> result = sql("SELECT * FROM t");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[1, hello, null], +I[2, world, null], +I[1, flink, 
2020-01-01], +I[2, paimon, 2020-01-02]]");
+
+        // time travel to snapshot 1
+        result = sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", 
anchor);
+        assertThat(result.toString()).isEqualTo("[+I[1, hello], +I[2, 
world]]");
+    }
+
+    @Test
+    public void testTravelToNonExistedTimestamp() {
+        sql("CREATE TABLE t (k INT, v STRING)");
+        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+        assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP 
'1900-01-01 00:00:00'"))
+                .isEmpty();
+    }
+
+    @Test
+    public void testSystemTableTimeTravel() throws Exception {
+        sql("CREATE TABLE t (k INT, v STRING)");
+
+        // snapshot 1
+        sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+
+        Thread.sleep(3000);
+        String anchor = now();
+
+        // snapshot 2
+        sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')");
+
+        List<Row> result = sql("SELECT * FROM t$files");
+        assertThat(result.size()).isEqualTo(2);
+
+        // time travel to snapshot 1
+        result = sql("SELECT * FROM t$files FOR SYSTEM_TIME AS OF TIMESTAMP 
'%s'", anchor);
+
+        assertThat(result.size()).isEqualTo(1);
+    }
+
+    // -------------------------------------------------------
+    //                        Streaming
+    // -------------------------------------------------------
+
+    @Test
+    public void testStreamingTravel() throws Exception {
+        sql("CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED, v STRING)");
+
+        BlockingIterator<Row, Row> streamIter =
+                streamSqlBlockIter("SELECT * FROM t FOR SYSTEM_TIME AS OF 
TIMESTAMP '%s'", now());
+
+        // snapshot 1
+        sql("INSERT INTO t VALUES(1, 'hello')");
+        // snapshot 2
+        sql("INSERT INTO t VALUES(1, 'apache')");
+
+        List<Row> result = streamIter.collect(3);
+        assertThat(result.toString()).isEqualTo("[+I[1, hello], -U[1, hello], 
+U[1, apache]]");
+
+        streamIter.close();
+    }
+
+    private String now() {
+        return 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index d81d0517b..07e0e9045 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -29,7 +29,6 @@ import 
org.apache.paimon.spark.analysis.NoSuchProcedureException;
 import org.apache.paimon.spark.catalog.ProcedureCatalog;
 import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
-import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Preconditions;
 
@@ -218,7 +217,7 @@ public class SparkCatalog implements TableCatalog, 
ProcedureCatalog, SupportsNam
      * Do not annotate with <code>@override</code> here to maintain 
compatibility with Spark 3.2-.
      */
     public SparkTable loadTable(Identifier ident, String version) throws 
NoSuchTableException {
-        Table table = loadAndRequireDataTable(ident);
+        Table table = loadPaimonTable(ident);
         Options dynamicOptions = new Options();
 
         if (version.chars().allMatch(Character::isDigit)) {
@@ -240,7 +239,7 @@ public class SparkCatalog implements TableCatalog, 
ProcedureCatalog, SupportsNam
      * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use 
seconds.
      */
     public SparkTable loadTable(Identifier ident, long timestamp) throws 
NoSuchTableException {
-        Table table = loadAndRequireDataTable(ident);
+        Table table = loadPaimonTable(ident);
         // Paimon's timestamp use millisecond
         timestamp = timestamp / 1000;
 
@@ -250,16 +249,9 @@ public class SparkCatalog implements TableCatalog, 
ProcedureCatalog, SupportsNam
         return new SparkTable(table.copy(option.toMap()));
     }
 
-    private Table loadAndRequireDataTable(Identifier ident) throws 
NoSuchTableException {
+    private Table loadPaimonTable(Identifier ident) throws 
NoSuchTableException {
         try {
-            Table table = load(ident);
-            if (!(table instanceof DataTable)) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Only DataTable supports time travel but given 
table type is '%s'.",
-                                table.getClass().getName()));
-            }
-            return table;
+            return load(ident);
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 7bf825d02..20c87df89 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -71,7 +71,6 @@ public class SparkTimeTravelITCase extends SparkReadTestBase {
                 GenericRow.of(2, BinaryString.fromString("Paimon")));
 
         String anchor = LocalDateTime.now().toString();
-        // Thread.sleep(1000);
 
         // snapshot 2
         writeTable(
@@ -168,14 +167,36 @@ public class SparkTimeTravelITCase extends 
SparkReadTestBase {
     }
 
     @Test
-    public void testUnsupportedSystemTableTimeTravel() {
+    public void testSystemTableTimeTravel() throws Exception {
         spark.sql("CREATE TABLE t (k INT, v STRING)");
 
-        assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` 
VERSION AS OF 1"))
-                .satisfies(
-                        AssertionUtils.anyCauseMatches(
-                                UnsupportedOperationException.class,
-                                "Only DataTable supports time travel but given 
table type is 'org.apache.paimon.table.system.SnapshotsTable'"));
+        // snapshot 1
+        writeTable(
+                "t",
+                GenericRow.of(1, BinaryString.fromString("Hello")),
+                GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+        String anchor = LocalDateTime.now().toString();
+
+        // snapshot 2
+        writeTable(
+                "t",
+                GenericRow.of(3, BinaryString.fromString("Test")),
+                GenericRow.of(4, BinaryString.fromString("Case")));
+
+        assertThat(spark.sql("SELECT * FROM 
`t$files`").collectAsList().size()).isEqualTo(2);
+
+        // time travel to snapshot 1
+        assertThat(spark.sql("SELECT * FROM `t$files` VERSION AS OF 
1").collectAsList().size())
+                .isEqualTo(1);
+        assertThat(
+                        spark.sql(
+                                        String.format(
+                                                "SELECT * FROM `t$files` 
TIMESTAMP AS OF '%s'",
+                                                anchor))
+                                .collectAsList()
+                                .size())
+                .isEqualTo(1);
     }
 
     @Test

Reply via email to