This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4befb7134 [flink] support time travel for flink (#2194)
4befb7134 is described below
commit 4befb71344e532f594fd8b46cd2e71409c97452c
Author: JunZhang <[email protected]>
AuthorDate: Thu Nov 2 13:55:32 2023 +0800
[flink] support time travel for flink (#2194)
---
docs/content/how-to/querying-tables.md | 27 +++-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 19 +++
.../org/apache/paimon/flink/TimeTravelITCase.java | 166 +++++++++++++++++++++
.../java/org/apache/paimon/spark/SparkCatalog.java | 16 +-
.../apache/paimon/spark/SparkTimeTravelITCase.java | 35 ++++-
5 files changed, 242 insertions(+), 21 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 5064302fe..e2b9fc912 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -43,7 +43,7 @@ Paimon batch reads with time travel can specify a snapshot or
a tag and read the
{{< tabs "time-travel-example" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink (dynamic option)" >}}
```sql
-- read the snapshot with id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
@@ -56,6 +56,17 @@ SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
```
{{< /tab >}}
+{{< tab "Flink 1.18+" >}}
+Flink SQL supports time travel syntax after 1.18.
+```sql
+-- read the snapshot from specified timestamp
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
+
+-- you can also use some simple expressions (see flink document to get
supported functions)
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' +
INTERVAL '1' DAY
+```
+{{< /tab >}}
+
{{< tab "Spark3" >}}
Requires Spark 3.3+.
@@ -274,7 +285,7 @@ SELECT * FROM t WHERE dt > '2023-06-26';
If it's not a partitioned table, or you can't filter by partition, you can use
Time travel's stream read.
{{< tabs "streaming-time-travel" >}}
-{{< tab "Flink" >}}
+{{< tab "Flink (dynamic option)" >}}
```sql
-- read changes from snapshot id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
@@ -286,6 +297,18 @@ SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' =
'1678883047356') */;
SELECT * FROM t /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
```
{{< /tab >}}
+
+{{< tab "Flink 1.18+" >}}
+Flink SQL supports time travel syntax after 1.18.
+```sql
+-- read the snapshot from specified timestamp
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
+
+-- you can also use some simple expressions (see flink document to get
supported functions)
+SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' +
INTERVAL '1' DAY
+```
+{{< /tab >}}
+
{{< /tabs >}}
### Consumer ID
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 260b90a5f..8606f8441 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -227,6 +227,19 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
+ return getTable(tablePath, null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.17-.
+ */
+ public CatalogTable getTable(ObjectPath tablePath, long timestamp)
+ throws TableNotExistException, CatalogException {
+ return getTable(tablePath, Long.valueOf(timestamp));
+ }
+
+ private CatalogTable getTable(ObjectPath tablePath, @Nullable Long
timestamp)
+ throws TableNotExistException {
Table table;
try {
table = catalog.getTable(toIdentifier(tablePath));
@@ -234,6 +247,12 @@ public class FlinkCatalog extends AbstractCatalog {
throw new TableNotExistException(getName(), tablePath);
}
+ if (timestamp != null) {
+ Options options = new Options();
+ options.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
+ table = table.copy(options.toMap());
+ }
+
if (table instanceof FileStoreTable) {
return toCatalogTable(table);
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
new file mode 100644
index 000000000..8cbc4d5b1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for flink time travel. */
+public class TimeTravelITCase extends CatalogITCaseBase {
+
+ // -------------------------------------------------------
+ // Batch
+ // -------------------------------------------------------
+
+ @Test
+ public void testTravelToTimestampString() throws Exception {
+ sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+ Thread.sleep(3000);
+ String anchor = now();
+ // snapshot 2
+ sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')");
+
+ List<Row> result = sql("SELECT * FROM t");
+ assertThat(result.toString())
+ .isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2,
paimon]]");
+
+ // time travel to snapshot 1
+ result = sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'",
anchor);
+ assertThat(result.toString()).isEqualTo("[+I[1, hello], +I[2,
world]]");
+ }
+
+ @Test
+ public void testExpression() throws Exception {
+ sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+
+ String anchor = now();
+ Thread.sleep(3000);
+
+ // snapshot 2
+ sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')");
+
+ List<Row> result = sql("SELECT * FROM t");
+ assertThat(result.toString())
+ .isEqualTo("[+I[1, hello], +I[2, world], +I[1, flink], +I[2,
paimon]]");
+
+ // time travel to snapshot 1
+ result =
+ sql(
+ "SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'
+ INTERVAL '1' SECOND",
+ anchor);
+ assertThat(result.toString()).isEqualTo("[+I[1, hello], +I[2,
world]]");
+ }
+
+ @Test
+ public void testTravelToOldSchema() throws Exception {
+ // old schema
+ sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+
+ Thread.sleep(3000);
+ String anchor = now();
+
+ // new schema
+ sql("ALTER TABLE t ADD dt STRING");
+
+ // snapshot 2
+ sql("INSERT INTO t VALUES(1, 'flink', '2020-01-01'), (2, 'paimon',
'2020-01-02')");
+
+ List<Row> result = sql("SELECT * FROM t");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[1, hello, null], +I[2, world, null], +I[1, flink,
2020-01-01], +I[2, paimon, 2020-01-02]]");
+
+ // time travel to snapshot 1
+ result = sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP '%s'",
anchor);
+ assertThat(result.toString()).isEqualTo("[+I[1, hello], +I[2,
world]]");
+ }
+
+ @Test
+ public void testTravelToNonExistedTimestamp() {
+ sql("CREATE TABLE t (k INT, v STRING)");
+ sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+ assertThat(sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP
'1900-01-01 00:00:00'"))
+ .isEmpty();
+ }
+
+ @Test
+ public void testSystemTableTimeTravel() throws Exception {
+ sql("CREATE TABLE t (k INT, v STRING)");
+
+ // snapshot 1
+ sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
+
+ Thread.sleep(3000);
+ String anchor = now();
+
+ // snapshot 2
+ sql("INSERT INTO t VALUES(1, 'flink'), (2, 'paimon')");
+
+ List<Row> result = sql("SELECT * FROM t$files");
+ assertThat(result.size()).isEqualTo(2);
+
+ // time travel to snapshot 1
+ result = sql("SELECT * FROM t$files FOR SYSTEM_TIME AS OF TIMESTAMP
'%s'", anchor);
+
+ assertThat(result.size()).isEqualTo(1);
+ }
+
+ // -------------------------------------------------------
+ // Streaming
+ // -------------------------------------------------------
+
+ @Test
+ public void testStreamingTravel() throws Exception {
+ sql("CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED, v STRING)");
+
+ BlockingIterator<Row, Row> streamIter =
+ streamSqlBlockIter("SELECT * FROM t FOR SYSTEM_TIME AS OF
TIMESTAMP '%s'", now());
+
+ // snapshot 1
+ sql("INSERT INTO t VALUES(1, 'hello')");
+ // snapshot 2
+ sql("INSERT INTO t VALUES(1, 'apache')");
+
+ List<Row> result = streamIter.collect(3);
+ assertThat(result.toString()).isEqualTo("[+I[1, hello], -U[1, hello],
+U[1, apache]]");
+
+ streamIter.close();
+ }
+
+ private String now() {
+ return
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index d81d0517b..07e0e9045 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -29,7 +29,6 @@ import
org.apache.paimon.spark.analysis.NoSuchProcedureException;
import org.apache.paimon.spark.catalog.ProcedureCatalog;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
-import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
@@ -218,7 +217,7 @@ public class SparkCatalog implements TableCatalog,
ProcedureCatalog, SupportsNam
* Do not annotate with <code>@override</code> here to maintain
compatibility with Spark 3.2-.
*/
public SparkTable loadTable(Identifier ident, String version) throws
NoSuchTableException {
- Table table = loadAndRequireDataTable(ident);
+ Table table = loadPaimonTable(ident);
Options dynamicOptions = new Options();
if (version.chars().allMatch(Character::isDigit)) {
@@ -240,7 +239,7 @@ public class SparkCatalog implements TableCatalog,
ProcedureCatalog, SupportsNam
* TableCatalog#loadTable(Identifier, long)}). But in SQL you should use
seconds.
*/
public SparkTable loadTable(Identifier ident, long timestamp) throws
NoSuchTableException {
- Table table = loadAndRequireDataTable(ident);
+ Table table = loadPaimonTable(ident);
// Paimon's timestamp use millisecond
timestamp = timestamp / 1000;
@@ -250,16 +249,9 @@ public class SparkCatalog implements TableCatalog,
ProcedureCatalog, SupportsNam
return new SparkTable(table.copy(option.toMap()));
}
- private Table loadAndRequireDataTable(Identifier ident) throws
NoSuchTableException {
+ private Table loadPaimonTable(Identifier ident) throws
NoSuchTableException {
try {
- Table table = load(ident);
- if (!(table instanceof DataTable)) {
- throw new UnsupportedOperationException(
- String.format(
- "Only DataTable supports time travel but given
table type is '%s'.",
- table.getClass().getName()));
- }
- return table;
+ return load(ident);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 7bf825d02..20c87df89 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -71,7 +71,6 @@ public class SparkTimeTravelITCase extends SparkReadTestBase {
GenericRow.of(2, BinaryString.fromString("Paimon")));
String anchor = LocalDateTime.now().toString();
- // Thread.sleep(1000);
// snapshot 2
writeTable(
@@ -168,14 +167,36 @@ public class SparkTimeTravelITCase extends
SparkReadTestBase {
}
@Test
- public void testUnsupportedSystemTableTimeTravel() {
+ public void testSystemTableTimeTravel() throws Exception {
spark.sql("CREATE TABLE t (k INT, v STRING)");
- assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots`
VERSION AS OF 1"))
- .satisfies(
- AssertionUtils.anyCauseMatches(
- UnsupportedOperationException.class,
- "Only DataTable supports time travel but given
table type is 'org.apache.paimon.table.system.SnapshotsTable'"));
+ // snapshot 1
+ writeTable(
+ "t",
+ GenericRow.of(1, BinaryString.fromString("Hello")),
+ GenericRow.of(2, BinaryString.fromString("Paimon")));
+
+ String anchor = LocalDateTime.now().toString();
+
+ // snapshot 2
+ writeTable(
+ "t",
+ GenericRow.of(3, BinaryString.fromString("Test")),
+ GenericRow.of(4, BinaryString.fromString("Case")));
+
+ assertThat(spark.sql("SELECT * FROM
`t$files`").collectAsList().size()).isEqualTo(2);
+
+ // time travel to snapshot 1
+ assertThat(spark.sql("SELECT * FROM `t$files` VERSION AS OF
1").collectAsList().size())
+ .isEqualTo(1);
+ assertThat(
+ spark.sql(
+ String.format(
+ "SELECT * FROM `t$files`
TIMESTAMP AS OF '%s'",
+ anchor))
+ .collectAsList()
+ .size())
+ .isEqualTo(1);
}
@Test