This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 17be3716be4 [FLINK-32646][table-planner] Fix TestTimeTravelCatalog 
missing snapshot version and add version info into source table's digest
17be3716be4 is described below

commit 17be3716be42cd9dc5986e97ae222971d9d19be7
Author: lincoln lee <[email protected]>
AuthorDate: Sun Jul 23 06:21:12 2023 +0800

    [FLINK-32646][table-planner] Fix TestTimeTravelCatalog missing snapshot 
version and add version info into source table's digest
    
    This closes #23047
---
 .../common/CommonPhysicalTableSourceScan.scala     |  13 ++
 .../planner/factories/TestTimeTravelCatalog.java   |   2 +-
 .../planner/plan/batch/sql/TimeTravelTest.java     | 191 +++++++++++++++++++++
 .../runtime/batch/sql/TimeTravelITCase.java        | 141 +--------------
 .../planner/runtime/utils/TimeTravelTestUtil.java  |  84 +++++++++
 .../planner/plan/batch/sql/TimeTravelTest.xml      | 170 ++++++++++++++++++
 .../table/planner/utils/DateTimeTestUtil.scala     |  17 ++
 7 files changed, 485 insertions(+), 133 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
index 68ffef98570..fd98515326a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.common
 
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogTable}
 import org.apache.flink.table.connector.source.ScanTableSource
 import org.apache.flink.table.planner.plan.schema.TableSourceTable
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil
@@ -51,9 +52,21 @@ abstract class CommonPhysicalTableSourceScan(
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
+    val version = extractSnapshotVersion()
     super
       .explainTerms(pw)
       .item("fields", getRowType.getFieldNames.asScala.mkString(", "))
       .itemIf("hints", RelExplainUtil.hintsToString(getHints), 
!getHints.isEmpty)
+      .itemIf("version", version.getOrElse(""), version.isDefined)
+  }
+
+  private def extractSnapshotVersion(): Option[String] = {
+    val originTable: CatalogBaseTable =
+      relOptTable.contextResolvedTable.getTable.asInstanceOf[CatalogBaseTable]
+    originTable match {
+      case catalogTable: CatalogTable if catalogTable.getSnapshot.isPresent =>
+        Option(catalogTable.getSnapshot.get().toString)
+      case _ => Option.empty
+    }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
index e5c6336ffe7..6b08c7a1682 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestTimeTravelCatalog.java
@@ -77,7 +77,7 @@ public class TestTimeTravelCatalog extends 
GenericInMemoryCatalog {
             String tableName, Schema schema, Map<String, String> properties, 
long timestamp)
             throws TableAlreadyExistException, DatabaseNotExistException, 
TableNotExistException {
         CatalogTable catalogTable =
-                CatalogTable.of(schema, "", Collections.emptyList(), 
properties);
+                CatalogTable.of(schema, "", Collections.emptyList(), 
properties, timestamp);
         ObjectPath objectPath = new ObjectPath(getDefaultDatabase(), 
tableName);
         if (!timeTravelTables.containsKey(objectPath)) {
             timeTravelTables.put(objectPath, new ArrayList<>());
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
new file mode 100644
index 00000000000..92b0a16bbe6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
+import org.apache.flink.table.planner.runtime.utils.TimeTravelTestUtil;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Plan tests for time travel. */
+public class TimeTravelTest extends TableTestBase {
+
+    private BatchTableTestUtil util;
+
+    @Before
+    public void before() {
+        util = batchTestUtil(TableConfig.getDefault());
+        String catalogName = "TimeTravelCatalog";
+        TestTimeTravelCatalog catalog =
+                
TimeTravelTestUtil.getTestingCatalogWithVersionedTable(catalogName, "t1");
+        TableEnvironment tEnv = util.tableEnv();
+        tEnv.registerCatalog(catalogName, catalog);
+        tEnv.useCatalog(catalogName);
+        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+    }
+
+    @Test
+    public void testTimeTravel() {
+        util.verifyExecPlan(
+                "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
02:00:00'");
+    }
+
+    @Test
+    public void testTimeTravelWithAsExpression() {
+        util.verifyExecPlan(
+                "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
02:00:00' AS t2");
+    }
+
+    @Test
+    public void testTimeTravelWithSimpleExpression() {
+        util.verifyExecPlan(
+                "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
00:00:00'+INTERVAL '60' DAY");
+    }
+
+    @Test
+    public void testTimeTravelWithDifferentTimezone() {
+        
util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+
+        util.verifyExecPlan(
+                String.format(
+                        "SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP 
'%s'",
+                        DateTimeTestUtil.timezoneConvert(
+                                "2023-01-01 02:00:00",
+                                "yyyy-MM-dd HH:mm:ss",
+                                ZoneId.of("UTC"),
+                                ZoneId.of("Asia/Shanghai"))));
+    }
+
+    @Test
+    public void testTimeTravelOneTableMultiTimes() {
+        util.verifyExecPlan(
+                "SELECT\n"
+                        + "    f1\n"
+                        + "FROM\n"
+                        + "    t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
01:00:00'\n"
+                        + "UNION ALL\n"
+                        + "SELECT\n"
+                        + "    f2\n"
+                        + "FROM\n"
+                        + "    t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
02:00:00'");
+    }
+
+    @Test
+    public void testTimeTravelWithLookupJoin() {
+        util.verifyExecPlan(
+                "SELECT\n"
+                        + "    l.f2,\n"
+                        + "    r.f3\n"
+                        + "FROM\n"
+                        + "    (\n"
+                        + "        SELECT\n"
+                        + "            *,\n"
+                        + "            proctime () as p\n"
+                        + "        FROM\n"
+                        + "            t1 FOR SYSTEM_TIME AS OF TIMESTAMP 
'2023-01-01 02:00:00'\n"
+                        + "    ) l\n"
+                        + "    LEFT JOIN t1 FOR SYSTEM_TIME AS OF l.p r ON 
l.f1=r.f1");
+    }
+
+    @Test
+    public void testTimeTravelWithHints() {
+        util.verifyExecPlan(
+                "SELECT * FROM t1 /*+ options('bounded'='true') */ FOR 
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00' AS t2");
+    }
+
+    @Test
+    public void testTimeTravelWithUnsupportedExpression() {
+        assertThatThrownBy(
+                        () ->
+                                util.tableEnv()
+                                        .executeSql(
+                                                "SELECT\n"
+                                                        + "    *\n"
+                                                        + "FROM\n"
+                                                        + "    t1 FOR 
SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ (0, 3)"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Unsupported time travel expression: 
TO_TIMESTAMP_LTZ(0, 3) for the expression can not be reduced to a constant by 
Flink.");
+
+        assertThatThrownBy(
+                        () ->
+                                util.tableEnv()
+                                        .executeSql(
+                                                "SELECT\n"
+                                                        + "    *\n"
+                                                        + "FROM\n"
+                                                        + "    t1 FOR 
SYSTEM_TIME AS OF PROCTIME()"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Unsupported time travel expression: PROCTIME() for 
the expression can not be reduced to a constant by Flink.");
+    }
+
+    @Test
+    public void testTimeTravelWithIdentifierSnapshot() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE\n"
+                                + "    t2 (f1 VARCHAR, f2 TIMESTAMP(3))\n"
+                                + "WITH\n"
+                                + "    ('connector'='values', 
'bounded'='true')");
+
+        // select snapshot with identifier only support in lookup join or 
temporal join.
+        // The following query can't generate a validate execution plan.
+
+        assertThatThrownBy(
+                        () ->
+                                util.tableEnv()
+                                        .executeSql(
+                                                "SELECT\n"
+                                                        + "    *\n"
+                                                        + "FROM\n"
+                                                        + "    t2 FOR 
SYSTEM_TIME AS OF f2"))
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining("Cannot generate a valid execution plan 
for the given query");
+    }
+
+    @Test
+    public void testTimeTravelWithView() {
+        util.tableEnv().executeSql("CREATE VIEW tb_view AS SELECT * FROM t1");
+
+        assertThatThrownBy(
+                        () ->
+                                util.tableEnv()
+                                        .executeSql(
+                                                "SELECT\n"
+                                                        + "    *\n"
+                                                        + "FROM\n"
+                                                        + "    tb_view FOR 
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 01:00:00'"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "TimeTravelCatalog.default.tb_view is a view, but time 
travel is not supported for view.");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
index bc6e6d41fe8..49e0f08c3f9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/TimeTravelITCase.java
@@ -19,58 +19,26 @@
 package org.apache.flink.table.planner.runtime.batch.sql;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TimeTravelTestUtil;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Test for time travel. */
 public class TimeTravelITCase extends BatchTestBase {
-    private static final List<Tuple3<String, Schema, List<Row>>> 
TEST_TIME_TRAVEL_DATE =
-            Arrays.asList(
-                    Tuple3.of(
-                            "2023-01-01 01:00:00",
-                            Schema.newBuilder().column("f1", 
DataTypes.INT()).build(),
-                            Collections.singletonList(Row.of(1))),
-                    Tuple3.of(
-                            "2023-01-01 02:00:00",
-                            Schema.newBuilder()
-                                    .column("f1", DataTypes.INT())
-                                    .column("f2", DataTypes.INT())
-                                    .build(),
-                            Collections.singletonList(Row.of(1, 2))),
-                    Tuple3.of(
-                            "2023-01-01 03:00:00",
-                            Schema.newBuilder()
-                                    .column("f1", DataTypes.INT())
-                                    .column("f2", DataTypes.INT())
-                                    .column("f3", DataTypes.INT())
-                                    .build(),
-                            Collections.singletonList(Row.of(1, 2, 3))));
-
     private static final List<Tuple2<String, String>> 
EXPECTED_TIME_TRAVEL_RESULT =
             Arrays.asList(
                     Tuple2.of("2023-01-01 01:00:00", "[+I[1]]"),
@@ -80,24 +48,12 @@ public class TimeTravelITCase extends BatchTestBase {
     @BeforeEach
     @Override
     public void before() {
-        TestTimeTravelCatalog catalog = new 
TestTimeTravelCatalog("TimeTravelCatalog");
+        String catalogName = "TimeTravelCatalog";
+        TestTimeTravelCatalog catalog =
+                
TimeTravelTestUtil.getTestingCatalogWithVersionedTable(catalogName, "t1");
 
-        TEST_TIME_TRAVEL_DATE.forEach(
-                t -> {
-                    String dataId = TestValuesTableFactory.registerData(t.f2);
-                    Map<String, String> options = new HashMap<>();
-                    options.put("connector", "values");
-                    options.put("bounded", "true");
-                    options.put("data-id", dataId);
-                    try {
-                        catalog.registerTableForTimeTravel(
-                                "t1", t.f1, options, convertStringToLong(t.f0, 
ZoneId.of("UTC")));
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                });
-        tEnv().registerCatalog("TimeTravelCatalog", catalog);
-        tEnv().useCatalog("TimeTravelCatalog");
+        tEnv().registerCatalog(catalogName, catalog);
+        tEnv().useCatalog(catalogName);
         tEnv().getConfig().setLocalTimeZone(ZoneId.of("UTC"));
     }
 
@@ -155,8 +111,9 @@ public class TimeTravelITCase extends BatchTestBase {
                                                     + "    *\n"
                                                     + "FROM\n"
                                                     + "    t1 FOR SYSTEM_TIME 
AS OF TIMESTAMP '%s' AS t2",
-                                            timezoneConvert(
+                                            DateTimeTestUtil.timezoneConvert(
                                                     res.f0,
+                                                    "yyyy-MM-dd HH:mm:ss",
                                                     ZoneId.of("UTC"),
                                                     
ZoneId.of("Asia/Shanghai"))));
             List<String> sortedResult = toSortedResults(tableResult);
@@ -218,69 +175,6 @@ public class TimeTravelITCase extends BatchTestBase {
         assertEquals("[+I[2, 3]]", sortedResult.toString());
     }
 
-    @Test
-    void testTimeTravelWithUnsupportedExpression() {
-        assertThatThrownBy(
-                        () ->
-                                tEnv().executeSql(
-                                                "SELECT\n"
-                                                        + "    *\n"
-                                                        + "FROM\n"
-                                                        + "    t1 FOR 
SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ (0, 3)"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Unsupported time travel expression: 
TO_TIMESTAMP_LTZ(0, 3) for the expression can not be reduced to a constant by 
Flink.");
-
-        assertThatThrownBy(
-                        () ->
-                                tEnv().executeSql(
-                                                "SELECT\n"
-                                                        + "    *\n"
-                                                        + "FROM\n"
-                                                        + "    t1 FOR 
SYSTEM_TIME AS OF PROCTIME()"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Unsupported time travel expression: PROCTIME() for 
the expression can not be reduced to a constant by Flink.");
-    }
-
-    @Test
-    void testTimeTravelWithIdentifierSnapshot() {
-        tEnv().executeSql(
-                        "CREATE TABLE\n"
-                                + "    t2 (f1 VARCHAR, f2 TIMESTAMP(3))\n"
-                                + "WITH\n"
-                                + "    ('connector'='values', 
'bounded'='true')");
-
-        // select snapshot with identifier only support in lookup join or 
temporal join.
-        // The following query can't generate a validate execution plan.
-
-        assertThatThrownBy(
-                        () ->
-                                tEnv().executeSql(
-                                                "SELECT\n"
-                                                        + "    *\n"
-                                                        + "FROM\n"
-                                                        + "    t2 FOR 
SYSTEM_TIME AS OF f2"))
-                .isInstanceOf(TableException.class)
-                .hasMessageContaining("Cannot generate a valid execution plan 
for the given query");
-    }
-
-    @Test
-    void testTimeTravelWithView() {
-        tEnv().executeSql("CREATE VIEW tb_view AS SELECT * FROM t1");
-
-        assertThatThrownBy(
-                        () ->
-                                tEnv().executeSql(
-                                                "SELECT\n"
-                                                        + "    *\n"
-                                                        + "FROM\n"
-                                                        + "    tb_view FOR 
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 01:00:00'"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "TimeTravelCatalog.default.tb_view is a view, but time 
travel is not supported for view.");
-    }
-
     @Test
     void testTimeTravelWithHints() {
         TableResult tableResult =
@@ -298,23 +192,6 @@ public class TimeTravelITCase extends BatchTestBase {
         assertEquals("[+I[1, 2]]", sortedResult.toString());
     }
 
-    private static Long convertStringToLong(String timestamp, ZoneId zoneId) {
-        return LocalDateTime.parse(timestamp, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
-                .atZone(zoneId)
-                .toInstant()
-                .toEpochMilli();
-    }
-
-    private static String timezoneConvert(
-            String timestamp, ZoneId originZoneId, ZoneId convrtedZoneId) {
-        return LocalDateTime.parse(timestamp, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
-                .atZone(originZoneId)
-                .toInstant()
-                .atZone(convrtedZoneId)
-                .toLocalDateTime()
-                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-    }
-
     private List<String> toSortedResults(TableResult result) {
         return CollectionUtil.iteratorToList(result.collect()).stream()
                 .map(Row::toString)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/TimeTravelTestUtil.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/TimeTravelTestUtil.java
new file mode 100644
index 00000000000..51c88177ee9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/TimeTravelTestUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
+import org.apache.flink.types.Row;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test data for time travel. */
+public class TimeTravelTestUtil {
+    public static final List<Tuple3<String, Schema, List<Row>>> 
TEST_TIME_TRAVEL_DATE =
+            Arrays.asList(
+                    Tuple3.of(
+                            "2023-01-01 01:00:00",
+                            Schema.newBuilder().column("f1", 
DataTypes.INT()).build(),
+                            Collections.singletonList(Row.of(1))),
+                    Tuple3.of(
+                            "2023-01-01 02:00:00",
+                            Schema.newBuilder()
+                                    .column("f1", DataTypes.INT())
+                                    .column("f2", DataTypes.INT())
+                                    .build(),
+                            Collections.singletonList(Row.of(1, 2))),
+                    Tuple3.of(
+                            "2023-01-01 03:00:00",
+                            Schema.newBuilder()
+                                    .column("f1", DataTypes.INT())
+                                    .column("f2", DataTypes.INT())
+                                    .column("f3", DataTypes.INT())
+                                    .build(),
+                            Collections.singletonList(Row.of(1, 2, 3))));
+
+    public static TestTimeTravelCatalog getTestingCatalogWithVersionedTable(
+            String catalogName, String tableName) {
+        TestTimeTravelCatalog catalog = new TestTimeTravelCatalog(catalogName);
+
+        TEST_TIME_TRAVEL_DATE.forEach(
+                t -> {
+                    String dataId = TestValuesTableFactory.registerData(t.f2);
+                    Map<String, String> options = new HashMap<>();
+                    options.put("connector", "values");
+                    options.put("bounded", "true");
+                    options.put("data-id", dataId);
+                    try {
+                        catalog.registerTableForTimeTravel(
+                                tableName,
+                                t.f1,
+                                options,
+                                DateTimeTestUtil.toEpochMills(
+                                        t.f0, "yyyy-MM-dd HH:mm:ss", 
ZoneId.of("UTC")));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+        return catalog;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.xml
new file mode 100644
index 00000000000..efc3464500f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.xml
@@ -0,0 +1,170 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testTimeTravel">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
02:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 02:00:00])
+   +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2], 
version=[1672538400000])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeTravelOneTableMultiTimes">
+    <Resource name="sql">
+      <![CDATA[SELECT
+    f1
+FROM
+    t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 01:00:00'
+UNION ALL
+SELECT
+    f2
+FROM
+    t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(f1=[$0])
+:  +- LogicalSnapshot(period=[2023-01-01 01:00:00])
+:     +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
++- LogicalProject(f2=[$1])
+   +- LogicalSnapshot(period=[2023-01-01 02:00:00])
+      +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Union(all=[true], union=[f1])
+:- TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1], 
version=[1672534800000])
++- TableSourceScan(table=[[TimeTravelCatalog, default, t1, project=[f2], 
metadata=[]]], fields=[f2], version=[1672538400000])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeTravelWithDifferentTimezone">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
10:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 10:00:00])
+   +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2], 
version=[1672538400000])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeTravelWithAsExpression">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
02:00:00' AS t2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 02:00:00])
+   +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1, project=[f1, f2], 
metadata=[]]], fields=[f1, f2], version=[1672542000000])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeTravelWithHints">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1 /*+ options('bounded'='true') */ FOR 
SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00' AS t2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$0], f2=[$1])
++- LogicalSnapshot(period=[2023-01-01 02:00:00])
+   +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2], 
version=[1672538400000])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeTravelWithLookupJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT
+    l.f2,
+    r.f3
+FROM
+    (
+        SELECT
+            *,
+            proctime () as p
+        FROM
+            t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 02:00:00'
+    ) l
+    LEFT JOIN t1 FOR SYSTEM_TIME AS OF l.p r ON l.f1=r.f1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f2=[$1], f3=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
2}])
+   :- LogicalProject(f1=[$0], f2=[$1], p=[PROCTIME()])
+   :  +- LogicalSnapshot(period=[2023-01-01 02:00:00])
+   :     +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+   +- LogicalFilter(condition=[=($cor0.f1, $0)])
+      +- LogicalSnapshot(period=[$cor0.p])
+         +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[f2, f3])
++- LookupJoin(table=[TimeTravelCatalog.default.t1], joinType=[LeftOuterJoin], 
lookup=[f1=f1], select=[f1, f2, f10, f3])
+   +- TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, 
f2], version=[1672538400000])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeTravelWithSimpleExpression">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1 FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 
00:00:00'+INTERVAL '60' DAY]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f1=[$0], f2=[$1], f3=[$2])
++- LogicalSnapshot(period=[2023-03-02 00:00:00])
+   +- LogicalTableScan(table=[[TimeTravelCatalog, default, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[TimeTravelCatalog, default, t1]], fields=[f1, f2, f3], 
version=[1672542000000])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
index ab932f5ed69..a6b8a7cbe98 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
@@ -21,6 +21,7 @@ import 
org.apache.flink.table.data.util.DataFormatConverters.{LocalDateConverter
 import org.apache.flink.table.utils.DateTimeUtils
 
 import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId}
+import java.time.format.DateTimeFormatter
 
 object DateTimeTestUtil {
 
@@ -51,4 +52,20 @@ object DateTimeTestUtil {
   def toEpochMills(s: String, zone: ZoneId): Long = {
     LocalDateTime.parse(s).atZone(zone).toInstant.toEpochMilli
   }
+
+  /** Returns the epoch millisecond using given datetime formatter and time 
zone id. */
+  def toEpochMills(s: String, format: String, zone: ZoneId): Long = {
+    LocalDateTime.parse(s, 
DateTimeFormatter.ofPattern(format)).atZone(zone).toInstant.toEpochMilli
+  }
+
+  /** Converts the given timestamp from `fromZone` to `toZone` using specific 
`format`. */
+  def timezoneConvert(s: String, format: String, fromZone: ZoneId, toZone: 
ZoneId): String = {
+    LocalDateTime
+      .parse(s, DateTimeFormatter.ofPattern(format))
+      .atZone(fromZone)
+      .toInstant
+      .atZone(toZone)
+      .toLocalDateTime
+      .format(DateTimeFormatter.ofPattern(format))
+  }
 }


Reply via email to