This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 17be3716be4 [FLINK-32646][table-planner] Fix TestTimeTravelCatalog
missing snapshot version and add version info into source table's digest
17be3716be4 is described below
commit 17be3716be42cd9dc5986e97ae222971d9d19be7
Author: lincoln lee <[email protected]>
AuthorDate: Sun Jul 23 06:21:12 2023 +0800
[FLINK-32646][table-planner] Fix TestTimeTravelCatalog missing snapshot
version and add version info into source table's digest
This closes #23047
---
.../common/CommonPhysicalTableSourceScan.scala | 13 ++
.../planner/factories/TestTimeTravelCatalog.java | 2 +-
.../planner/plan/batch/sql/TimeTravelTest.java | 191 +++++++++++++++++++++
.../runtime/batch/sql/TimeTravelITCase.java | 141 +--------------
.../planner/runtime/utils/TimeTravelTestUtil.java | 84 +++++++++
.../planner/plan/batch/sql/TimeTravelTest.xml | 170 ++++++++++++++++++
.../table/planner/utils/DateTimeTestUtil.scala | 17 ++
7 files changed, 485 insertions(+), 133 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
index 68ffef98570..fd98515326a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.common
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogTable}
import org.apache.flink.table.connector.source.ScanTableSource
import org.apache.flink.table.planner.plan.schema.TableSourceTable
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
@@ -51,9 +52,21 @@ abstract class CommonPhysicalTableSourceScan(
}
override def explainTerms(pw: RelWriter): RelWriter = {
+ val version = extractSnapshotVersion()
super
.explainTerms(pw)
.item("fields", getRowType.getFieldNames.asScala.mkString(", "))
.itemIf("hints", RelExplainUtil.hintsToString(getHints),
!getHints.isEmpty)
+ .itemIf("version", version.getOrElse(""), version.isDefined)
+ }
+
+ private def extractSnapshotVersion(): Option[String] = {
+ val originTable: CatalogBaseTable =
+ relOptTable.contextResolvedTable.getTable.asInstanceOf[CatalogBaseTable]
+ originTable match {
+ case catalogTable: CatalogTable if catalogTable.getSnapshot.isPresent =>
+ Option(catalogTable.getSnapshot.get().toString)
+ case _ => Option.empty
+ }
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
index e5c6336ffe7..6b08c7a1682 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
@@ -77,7 +77,7 @@ public class TestTimeTravelCatalog extends
GenericInMemoryCatalog {
String tableName, Schema schema, Map<String, String> properties,
long timestamp)
throws TableAlreadyExistException, DatabaseNotExistException,
TableNotExistException {
CatalogTable catalogTable =
- CatalogTable.of(schema, "", Collections.emptyList(),
properties);
+ CatalogTable.of(schema, "", Collections.emptyList(),
properties, timestamp);
ObjectPath objectPath = new ObjectPath(getDefaultDatabase(),
tableName);
if (!timeTravelTables.containsKey(objectPath)) {
timeTravelTables.put(objectPath, new ArrayList<>());
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
new file mode 100644
index 00000000000..92b0a16bbe6
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
+import org.apache.flink.table.planner.runtime.utils.TimeTravelTestUtil;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Plan tests for time travel. */
+public class TimeTravelTest extends TableTestBase {
+
+ private BatchTableTestUtil util;
+
+ @Before
+ public void before() {
+ util = batchTestUtil(TableConfig.getDefault());
+ String catalogName = "TimeTravelCatalog";
+ TestTimeTravelCatalog catalog =
+
TimeTravelTestUtil.getTestingCatalogWithVersionedTable(catalogName, "t1");
+ TableEnvironment tEnv = util.tableEnv();
+ tEnv.registerCatalog(catalogName, catalog);
+ tEnv.useCatalog(catalogName);
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+ }
+
+ @Test
+ public void testTimeTravel() {
+ util.verifyExecPlan(
+ "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
02:00:00'");
+ }
+
+ @Test
+ public void testTimeTravelWithAsExpression() {
+ util.verifyExecPlan(
+ "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
02:00:00' AS t2");
+ }
+
+ @Test
+ public void testTimeTravelWithSimpleExpression() {
+ util.verifyExecPlan(
+ "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
00:00:00'+INTERVAL '60' DAY");
+ }
+
+ @Test
+ public void testTimeTravelWithDifferentTimezone() {
+
util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+
+ util.verifyExecPlan(
+ String.format(
+ "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP
'%s'",
+ DateTimeTestUtil.timezoneConvert(
+ "2023-01-01 02:00:00",
+ "yyyy-MM-dd HH:mm:ss",
+ ZoneId.of("UTC"),
+ ZoneId.of("Asia/Shanghai"))));
+ }
+
+ @Test
+ public void testTimeTravelOneTableMultiTimes() {
+ util.verifyExecPlan(
+ "SELECT\n"
+ + " f1\n"
+ + "FROM\n"
+ + " t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
01:00:00'\n"
+ + "UNION ALL\n"
+ + "SELECT\n"
+ + " f2\n"
+ + "FROM\n"
+ + " t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
02:00:00'");
+ }
+
+ @Test
+ public void testTimeTravelWithLookupJoin() {
+ util.verifyExecPlan(
+ "SELECT\n"
+ + " l.f2,\n"
+ + " r.f3\n"
+ + "FROM\n"
+ + " (\n"
+ + " SELECT\n"
+ + " *,\n"
+ + " proctime () as p\n"
+ + " FROM\n"
+ + " t1 FOR SYSTEM_TIME AS OF TIMESTAMP
'2023-01-01 02:00:00'\n"
+ + " ) l\n"
+ + " LEFT JOIN t1 FOR SYSTEM_TIME AS OF l.p r ON
l.f1=r.f1");
+ }
+
+ @Test
+ public void testTimeTravelWithHints() {
+ util.verifyExecPlan(
+ "SELECT * FROM t1 /*+ options('bounded'='true') */ FOR
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00' AS t2");
+ }
+
+ @Test
+ public void testTimeTravelWithUnsupportedExpression() {
+ assertThatThrownBy(
+ () ->
+ util.tableEnv()
+ .executeSql(
+ "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " t1 FOR
SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ (0, 3)"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unsupported time travel expression:
TO_TIMESTAMP_LTZ(0, 3) for the expression can not be reduced to a constant by
Flink.");
+
+ assertThatThrownBy(
+ () ->
+ util.tableEnv()
+ .executeSql(
+ "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " t1 FOR
SYSTEM_TIME AS OF PROCTIME()"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unsupported time travel expression: PROCTIME() for
the expression can not be reduced to a constant by Flink.");
+ }
+
+ @Test
+ public void testTimeTravelWithIdentifierSnapshot() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE\n"
+ + " t2 (f1 VARCHAR, f2 TIMESTAMP(3))\n"
+ + "WITH\n"
+ + " ('connector'='values',
'bounded'='true')");
+
+ // select snapshot with identifier only support in lookup join or
temporal join.
+ // The following query can't generate a validate execution plan.
+
+ assertThatThrownBy(
+ () ->
+ util.tableEnv()
+ .executeSql(
+ "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " t2 FOR
SYSTEM_TIME AS OF f2"))
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining("Cannot generate a valid execution plan
for the given query");
+ }
+
+ @Test
+ public void testTimeTravelWithView() {
+ util.tableEnv().executeSql("CREATE VIEW tb_view AS SELECT * FROM t1");
+
+ assertThatThrownBy(
+ () ->
+ util.tableEnv()
+ .executeSql(
+ "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " tb_view FOR
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 01:00:00'"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "TimeTravelCatalog.default.tb_view is a view, but time
travel is not supported for view.");
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
index bc6e6d41fe8..49e0f08c3f9 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
@@ -19,58 +19,26 @@
package org.apache.flink.table.planner.runtime.batch.sql;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TimeTravelTestUtil;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Test for time travel. */
public class TimeTravelITCase extends BatchTestBase {
- private static final List<Tuple3<String, Schema, List<Row>>>
TEST_TIME_TRAVEL_DATE =
- Arrays.asList(
- Tuple3.of(
- "2023-01-01 01:00:00",
- Schema.newBuilder().column("f1",
DataTypes.INT()).build(),
- Collections.singletonList(Row.of(1))),
- Tuple3.of(
- "2023-01-01 02:00:00",
- Schema.newBuilder()
- .column("f1", DataTypes.INT())
- .column("f2", DataTypes.INT())
- .build(),
- Collections.singletonList(Row.of(1, 2))),
- Tuple3.of(
- "2023-01-01 03:00:00",
- Schema.newBuilder()
- .column("f1", DataTypes.INT())
- .column("f2", DataTypes.INT())
- .column("f3", DataTypes.INT())
- .build(),
- Collections.singletonList(Row.of(1, 2, 3))));
-
private static final List<Tuple2<String, String>>
EXPECTED_TIME_TRAVEL_RESULT =
Arrays.asList(
Tuple2.of("2023-01-01 01:00:00", "[+I[1]]"),
@@ -80,24 +48,12 @@ public class TimeTravelITCase extends BatchTestBase {
@BeforeEach
@Override
public void before() {
- TestTimeTravelCatalog catalog = new
TestTimeTravelCatalog("TimeTravelCatalog");
+ String catalogName = "TimeTravelCatalog";
+ TestTimeTravelCatalog catalog =
+
TimeTravelTestUtil.getTestingCatalogWithVersionedTable(catalogName, "t1");
- TEST_TIME_TRAVEL_DATE.forEach(
- t -> {
- String dataId = TestValuesTableFactory.registerData(t.f2);
- Map<String, String> options = new HashMap<>();
- options.put("connector", "values");
- options.put("bounded", "true");
- options.put("data-id", dataId);
- try {
- catalog.registerTableForTimeTravel(
- "t1", t.f1, options, convertStringToLong(t.f0,
ZoneId.of("UTC")));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- tEnv().registerCatalog("TimeTravelCatalog", catalog);
- tEnv().useCatalog("TimeTravelCatalog");
+ tEnv().registerCatalog(catalogName, catalog);
+ tEnv().useCatalog(catalogName);
tEnv().getConfig().setLocalTimeZone(ZoneId.of("UTC"));
}
@@ -155,8 +111,9 @@ public class TimeTravelITCase extends BatchTestBase {
+ " *\n"
+ "FROM\n"
+ " t1 FOR SYSTEM_TIME
AS OF TIMESTAMP '%s' AS t2",
- timezoneConvert(
+ DateTimeTestUtil.timezoneConvert(
res.f0,
+ "yyyy-MM-dd HH:mm:ss",
ZoneId.of("UTC"),
ZoneId.of("Asia/Shanghai"))));
List<String> sortedResult = toSortedResults(tableResult);
@@ -218,69 +175,6 @@ public class TimeTravelITCase extends BatchTestBase {
assertEquals("[+I[2, 3]]", sortedResult.toString());
}
- @Test
- void testTimeTravelWithUnsupportedExpression() {
- assertThatThrownBy(
- () ->
- tEnv().executeSql(
- "SELECT\n"
- + " *\n"
- + "FROM\n"
- + " t1 FOR
SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ (0, 3)"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Unsupported time travel expression:
TO_TIMESTAMP_LTZ(0, 3) for the expression can not be reduced to a constant by
Flink.");
-
- assertThatThrownBy(
- () ->
- tEnv().executeSql(
- "SELECT\n"
- + " *\n"
- + "FROM\n"
- + " t1 FOR
SYSTEM_TIME AS OF PROCTIME()"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Unsupported time travel expression: PROCTIME() for
the expression can not be reduced to a constant by Flink.");
- }
-
- @Test
- void testTimeTravelWithIdentifierSnapshot() {
- tEnv().executeSql(
- "CREATE TABLE\n"
- + " t2 (f1 VARCHAR, f2 TIMESTAMP(3))\n"
- + "WITH\n"
- + " ('connector'='values',
'bounded'='true')");
-
- // select snapshot with identifier only support in lookup join or
temporal join.
- // The following query can't generate a validate execution plan.
-
- assertThatThrownBy(
- () ->
- tEnv().executeSql(
- "SELECT\n"
- + " *\n"
- + "FROM\n"
- + " t2 FOR
SYSTEM_TIME AS OF f2"))
- .isInstanceOf(TableException.class)
- .hasMessageContaining("Cannot generate a valid execution plan
for the given query");
- }
-
- @Test
- void testTimeTravelWithView() {
- tEnv().executeSql("CREATE VIEW tb_view AS SELECT * FROM t1");
-
- assertThatThrownBy(
- () ->
- tEnv().executeSql(
- "SELECT\n"
- + " *\n"
- + "FROM\n"
- + " tb_view FOR
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 01:00:00'"))
- .isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "TimeTravelCatalog.default.tb_view is a view, but time
travel is not supported for view.");
- }
-
@Test
void testTimeTravelWithHints() {
TableResult tableResult =
@@ -298,23 +192,6 @@ public class TimeTravelITCase extends BatchTestBase {
assertEquals("[+I[1, 2]]", sortedResult.toString());
}
- private static Long convertStringToLong(String timestamp, ZoneId zoneId) {
- return LocalDateTime.parse(timestamp,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
- .atZone(zoneId)
- .toInstant()
- .toEpochMilli();
- }
-
- private static String timezoneConvert(
- String timestamp, ZoneId originZoneId, ZoneId convrtedZoneId) {
- return LocalDateTime.parse(timestamp,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
- .atZone(originZoneId)
- .toInstant()
- .atZone(convrtedZoneId)
- .toLocalDateTime()
- .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- }
-
private List<String> toSortedResults(TableResult result) {
return CollectionUtil.iteratorToList(result.collect()).stream()
.map(Row::toString)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/TimeTravelTestUtil.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/TimeTravelTestUtil.java
new file mode 100644
index 00000000000..51c88177ee9
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/TimeTravelTestUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
+import org.apache.flink.types.Row;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test data for time travel. */
+public class TimeTravelTestUtil {
+ public static final List<Tuple3<String, Schema, List<Row>>>
TEST_TIME_TRAVEL_DATE =
+ Arrays.asList(
+ Tuple3.of(
+ "2023-01-01 01:00:00",
+ Schema.newBuilder().column("f1",
DataTypes.INT()).build(),
+ Collections.singletonList(Row.of(1))),
+ Tuple3.of(
+ "2023-01-01 02:00:00",
+ Schema.newBuilder()
+ .column("f1", DataTypes.INT())
+ .column("f2", DataTypes.INT())
+ .build(),
+ Collections.singletonList(Row.of(1, 2))),
+ Tuple3.of(
+ "2023-01-01 03:00:00",
+ Schema.newBuilder()
+ .column("f1", DataTypes.INT())
+ .column("f2", DataTypes.INT())
+ .column("f3", DataTypes.INT())
+ .build(),
+ Collections.singletonList(Row.of(1, 2, 3))));
+
+ public static TestTimeTravelCatalog getTestingCatalogWithVersionedTable(
+ String catalogName, String tableName) {
+ TestTimeTravelCatalog catalog = new TestTimeTravelCatalog(catalogName);
+
+ TEST_TIME_TRAVEL_DATE.forEach(
+ t -> {
+ String dataId = TestValuesTableFactory.registerData(t.f2);
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "values");
+ options.put("bounded", "true");
+ options.put("data-id", dataId);
+ try {
+ catalog.registerTableForTimeTravel(
+ tableName,
+ t.f1,
+ options,
+ DateTimeTestUtil.toEpochMills(
+ t.f0, "yyyy-MM-dd HH:mm:ss",
ZoneId.of("UTC")));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ return catalog;
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.xml
new file mode 100644
index 00000000000..efc3464500f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.xml
@@ -0,0 +1,170 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testTimeTravel">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
02:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 02:00:00])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2],
version=[1672538400000])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTimeTravelOneTableMultiTimes">
+ <Resource name="sql">
+ <![CDATA[SELECT
+ f1
+FROM
+ t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 01:00:00'
+UNION ALL
+SELECT
+ f2
+FROM
+ t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(f1=[$0])
+: +- LogicalSnapshot(period=[2023-01-01 01:00:00])
+: +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
++- LogicalProject(f2=[$1])
+ +- LogicalSnapshot(period=[2023-01-01 02:00:00])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Union(all=[true], union=[f1])
+:- TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1],
version=[1672534800000])
++- TableSourceScan(table=[[TimeTravelCatalog, default, t1, project=[f2],
metadata=[]]], fields=[f2], version=[1672538400000])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTimeTravelWithDifferentTimezone">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
10:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 10:00:00])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2],
version=[1672538400000])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTimeTravelWithAsExpression">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
02:00:00' AS t2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 02:00:00])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1, project=[f1, f2],
metadata=[]]], fields=[f1, f2], version=[1672542000000])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTimeTravelWithHints">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1 /*+ options('bounded'='true') */ FOR
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00' AS t2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 02:00:00])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2],
version=[1672538400000])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTimeTravelWithLookupJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT
+ l.f2,
+ r.f3
+FROM
+ (
+ SELECT
+ *,
+ proctime () as p
+ FROM
+ t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00'
+ ) l
+ LEFT JOIN t1 FOR SYSTEM_TIME AS OF l.p r ON l.f1=r.f1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f2=[$1], f3=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
2}])
+ :- LogicalProject(f1=[$0], f2=[$1], p=[PROCTIME()])
+ : +- LogicalSnapshot(period=[2023-01-01 02:00:00])
+ : +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+ +- LogicalFilter(condition=[=($cor0.f1, $0)])
+ +- LogicalSnapshot(period=[$cor0.p])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[f2, f3])
++- LookupJoin(table=[TimeTravelCatalog.default.t1], joinType=[LeftOuterJoin],
lookup=[f1=f1], select=[f1, f2, f10, f3])
+ +- TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1,
f2], version=[1672538400000])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTimeTravelWithSimpleExpression">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01
00:00:00'+INTERVAL '60' DAY]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f1=[$0], f2=[$1], f3=[$2])
++- LogicalSnapshot(period=[2023-03-02 00:00:00])
+ +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2, f3],
version=[1672542000000])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
index ab932f5ed69..a6b8a7cbe98 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
@@ -21,6 +21,7 @@ import
org.apache.flink.table.data.util.DataFormatConverters.{LocalDateConverter
import org.apache.flink.table.utils.DateTimeUtils
import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId}
+import java.time.format.DateTimeFormatter
object DateTimeTestUtil {
@@ -51,4 +52,20 @@ object DateTimeTestUtil {
def toEpochMills(s: String, zone: ZoneId): Long = {
LocalDateTime.parse(s).atZone(zone).toInstant.toEpochMilli
}
+
+ /** Returns the epoch millisecond using given datetime formatter and time
zone id. */
+ def toEpochMills(s: String, format: String, zone: ZoneId): Long = {
+ LocalDateTime.parse(s,
DateTimeFormatter.ofPattern(format)).atZone(zone).toInstant.toEpochMilli
+ }
+
+ /** Converts the given timestamp from `fromZone` to `toZone` using specific
`format`. */
+ def timezoneConvert(s: String, format: String, fromZone: ZoneId, toZone:
ZoneId): String = {
+ LocalDateTime
+ .parse(s, DateTimeFormatter.ofPattern(format))
+ .atZone(fromZone)
+ .toInstant
+ .atZone(toZone)
+ .toLocalDateTime
+ .format(DateTimeFormatter.ofPattern(format))
+ }
}