This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 53a2886b484 [FLINK-38938][table] Restore of SQL job with 
`CURRENT_TIMESTAMP` fails with `CodeGenException`
53a2886b484 is described below

commit 53a2886b484a415fb8c972f87d5f2e1602675c8a
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jan 19 12:39:14 2026 +0100

    [FLINK-38938][table] Restore of SQL job with `CURRENT_TIMESTAMP` fails with 
`CodeGenException`
---
 .../sql/parser/validate/FlinkSqlConformance.java   |   2 +-
 .../functions/sql/FlinkSqlOperatorTable.java       | 109 +++++--------------
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   4 +-
 .../plan/nodes/exec/common/CalcTestPrograms.java   |  19 ++++
 .../plan/nodes/exec/stream/CalcRestoreTest.java    |   3 +-
 .../nodes/exec/stream/PythonCalcJsonPlanTest.java  |  13 +++
 .../flink/table/planner/utils/DiffRepository.java  |  12 ++-
 .../plan/batch/sql/DynamicFunctionPlanTest.xml     |   8 +-
 .../testTimestampFunction.out                      | 102 ++++++++++++++++++
 .../plan/stream/sql/DynamicFunctionPlanTest.xml    |   8 +-
 .../planner/plan/stream/sql/MatchRecognizeTest.xml |   4 +-
 .../plan/stream/sql/NonDeterministicDagTest.xml    |  38 +++----
 .../plan/calc-current-timestamp.json               | 117 +++++++++++++++++++++
 .../calc-current-timestamp/savepoint/_metadata     | Bin 0 -> 5868 bytes
 14 files changed, 323 insertions(+), 116 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
index b1235d6a42c..16a6692abc4 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
@@ -114,7 +114,7 @@ public enum FlinkSqlConformance implements SqlConformance {
 
     @Override
     public boolean allowNiladicParentheses() {
-        return false;
+        return true;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 1d6ff95f3e9..b565f5764b0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -84,94 +84,39 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 
             // register functions based on batch or streaming mode
             final FlinkSqlOperatorTable finalInstance = instance;
-            dynamicFunctions(isBatchMode)
-                    .forEach(
-                            f -> {
-                                finalInstance.register(f);
-                            });
+            dynamicFunctions(isBatchMode).forEach(finalInstance::register);
             cachedInstances.put(isBatchMode, finalInstance);
         }
         return instance;
     }
 
     public static List<SqlFunction> dynamicFunctions(boolean isBatchMode) {
-        List<SqlFunction> sqlFunctions =
-                Arrays.asList(
-                        new FlinkTimestampDynamicFunction(
-                                SqlStdOperatorTable.LOCALTIME.getName(),
-                                SqlTypeName.TIME,
-                                isBatchMode) {
-                            @Override
-                            public SqlSyntax getSyntax() {
-                                return SqlSyntax.FUNCTION;
-                            }
-                        },
-                        new FlinkTimestampDynamicFunction(
-                                SqlStdOperatorTable.LOCALTIME.getName(),
-                                SqlTypeName.TIME,
-                                isBatchMode),
-                        new FlinkTimestampDynamicFunction(
-                                SqlStdOperatorTable.CURRENT_TIME.getName(),
-                                SqlTypeName.TIME,
-                                isBatchMode) {
-                            @Override
-                            public SqlSyntax getSyntax() {
-                                return SqlSyntax.FUNCTION;
-                            }
-                        },
-                        new FlinkTimestampDynamicFunction(
-                                SqlStdOperatorTable.CURRENT_TIME.getName(),
-                                SqlTypeName.TIME,
-                                isBatchMode),
-                        new FlinkCurrentDateDynamicFunction(isBatchMode) {
-                            @Override
-                            public SqlSyntax getSyntax() {
-                                return SqlSyntax.FUNCTION;
-                            }
-                        },
-                        new FlinkCurrentDateDynamicFunction(isBatchMode),
-                        new FlinkTimestampWithPrecisionDynamicFunction(
-                                SqlStdOperatorTable.LOCALTIMESTAMP.getName(),
-                                SqlTypeName.TIMESTAMP,
-                                isBatchMode,
-                                3) {
-                            @Override
-                            public SqlSyntax getSyntax() {
-                                return SqlSyntax.FUNCTION;
-                            }
-                        },
-                        new FlinkTimestampWithPrecisionDynamicFunction(
-                                SqlStdOperatorTable.LOCALTIMESTAMP.getName(),
-                                SqlTypeName.TIMESTAMP,
-                                isBatchMode,
-                                3),
-                        new FlinkTimestampWithPrecisionDynamicFunction(
-                                
SqlStdOperatorTable.CURRENT_TIMESTAMP.getName(),
-                                SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
-                                isBatchMode,
-                                3) {
-                            @Override
-                            public SqlSyntax getSyntax() {
-                                return SqlSyntax.FUNCTION;
-                            }
-                        },
-                        new FlinkTimestampWithPrecisionDynamicFunction(
-                                
SqlStdOperatorTable.CURRENT_TIMESTAMP.getName(),
-                                SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
-                                isBatchMode,
-                                3),
-                        new FlinkTimestampWithPrecisionDynamicFunction(
-                                FlinkTimestampWithPrecisionDynamicFunction.NOW,
-                                SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
-                                isBatchMode,
-                                3) {
-                            @Override
-                            public SqlSyntax getSyntax() {
-                                return SqlSyntax.FUNCTION;
-                            }
-                        });
-
-        return sqlFunctions;
+        return List.of(
+                new FlinkTimestampDynamicFunction(
+                        SqlStdOperatorTable.LOCALTIME.getName(), 
SqlTypeName.TIME, isBatchMode),
+                new FlinkTimestampDynamicFunction(
+                        SqlStdOperatorTable.CURRENT_TIME.getName(), 
SqlTypeName.TIME, isBatchMode),
+                new FlinkCurrentDateDynamicFunction(isBatchMode),
+                new FlinkTimestampWithPrecisionDynamicFunction(
+                        SqlStdOperatorTable.LOCALTIMESTAMP.getName(),
+                        SqlTypeName.TIMESTAMP,
+                        isBatchMode,
+                        3),
+                new FlinkTimestampWithPrecisionDynamicFunction(
+                        SqlStdOperatorTable.CURRENT_TIMESTAMP.getName(),
+                        SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                        isBatchMode,
+                        3),
+                new FlinkTimestampWithPrecisionDynamicFunction(
+                        FlinkTimestampWithPrecisionDynamicFunction.NOW,
+                        SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                        isBatchMode,
+                        3) {
+                    @Override
+                    public SqlSyntax getSyntax() {
+                        return SqlSyntax.FUNCTION;
+                    }
+                });
     }
 
     private static void validateNoDynamicFunction(FlinkSqlOperatorTable 
instance)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 649ba8d90a2..9d8570848e7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -21,6 +21,7 @@ import org.apache.flink.sql.parser.ExtendedSqlNode
 import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet, 
SqlUseModules}
 import org.apache.flink.sql.parser.dml._
 import org.apache.flink.sql.parser.dql._
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.parse.CalciteParser
@@ -104,7 +105,8 @@ class FlinkPlannerImpl(
       SqlValidator.Config.DEFAULT
         .withIdentifierExpansion(true)
         .withDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
-        .withTypeCoercionEnabled(false),
+        .withTypeCoercionEnabled(false)
+        .withConformance(FlinkSqlConformance.DEFAULT),
       createToRelContext(),
       cluster,
       config
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
index 0ccbaccc7a0..0e5a1848faa 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
@@ -211,4 +211,23 @@ public class CalcTestPrograms {
                                     + "from source_t where "
                                     + "(udf1(a) > 0 or (a * b) < 100) and b > 
10")
                     .build();
+
+    public static final TableTestProgram CALC_CURRENT_TIMESTAMP =
+            TableTestProgram.of(
+                            "calc-current-timestamp", "validates basic calc 
with current timestamp")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("a BIGINT")
+                                    .producedBeforeRestore(Row.of(100L))
+                                    .producedAfterRestore(Row.of(10000L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a BIGINT")
+                                    .consumedBeforeRestore(Row.of(20L))
+                                    .consumedAfterRestore(Row.of(0L))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT extract(year from 
current_timestamp) / a FROM t")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
index 80b5fde5e53..8de2e05e054 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
@@ -41,6 +41,7 @@ public class CalcRestoreTest extends RestoreTestBase {
                 CalcTestPrograms.CALC_PROJECT_PUSHDOWN,
                 CalcTestPrograms.CALC_SARG,
                 CalcTestPrograms.CALC_UDF_SIMPLE,
-                CalcTestPrograms.CALC_UDF_COMPLEX);
+                CalcTestPrograms.CALC_UDF_COMPLEX,
+                CalcTestPrograms.CALC_CURRENT_TIMESTAMP);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
index dbe3c840cb0..1b577132cf7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
@@ -78,4 +78,17 @@ class PythonCalcJsonPlanTest extends TableTestBase {
         tEnv.executeSql(sinkTableDdl);
         util.verifyJsonPlan("insert into MySink select a, b from MyTable where 
pyFunc(b, b + 1)");
     }
+
+    @Test
+    void testTimestampFunction() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b timestamp(3)\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan("insert into MySink select 1, current_timestamp");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
index 5506dc6a75b..9e99e8f9043 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
@@ -21,7 +21,6 @@ import org.apache.calcite.avatica.util.Spaces;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.XmlOutput;
-import org.junit.jupiter.api.Assertions;
 import org.opentest4j.AssertionFailedError;
 import org.w3c.dom.CDATASection;
 import org.w3c.dom.Comment;
@@ -48,6 +47,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 // THIS FILE IS COPIED FROM APACHE CALCITE
 
 /**
@@ -435,7 +436,14 @@ public class DiffRepository {
                 // for largish snippets
                 String expected2Canonical = 
expected2.replace(Util.LINE_SEPARATOR, "\n");
                 String actualCanonical = actual.replace(Util.LINE_SEPARATOR, 
"\n");
-                Assertions.assertEquals(expected2Canonical, actualCanonical, 
tag);
+                assertThat(actualCanonical)
+                        .withFailMessage(
+                                () ->
+                                        "Expected: "
+                                                + expected2Canonical
+                                                + " but was: "
+                                                + actualCanonical)
+                        .isEqualTo(expected2Canonical);
             } catch (AssertionFailedError e) {
                 amend(testCaseName, expected, actual);
                 throw e;
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
index c35914e6ebc..1405e31e772 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
@@ -28,7 +28,7 @@ GROUP BY cat, gmt_date]]>
       <![CDATA[
 LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
 +- LogicalProject(cat=[$2], gmt_date=[$3], cnt=[$4])
-   +- LogicalFilter(condition=[=($3, CURRENT_DATE())])
+   +- LogicalFilter(condition=[=($3, CURRENT_DATE)])
       +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
@@ -53,7 +53,7 @@ HashAggregate(isMerge=[false], groupBy=[cat], 
auxGrouping=[gmt_date], select=[ca
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], ts=[$5], 
hh=[$6])
 +- LogicalFilter(condition=[AND(>(CAST($6):BIGINT, 12), LIKE($2, 
_UTF-16LE'fruit%'))])
-   +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
+   +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
@@ -77,7 +77,7 @@ GROUP BY cat, hh]]>
 LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
 +- LogicalProject(cat=[$2], hh=[$6], cnt=[$4])
    +- LogicalFilter(condition=[=(SUBSTR(CAST($5):VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE", 1, 2), $6)])
-      +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
+      +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
          +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
@@ -104,7 +104,7 @@ GROUP BY gmt_date, ts, cat]]>
 LogicalProject(gmt_date=[$0], ts=[$1], cat=[$2], 
EXPR$3=[SUBSTR(CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 1, 2)], 
EXPR$4=[$3])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$4=[SUM($3)])
    +- LogicalProject(gmt_date=[$3], ts=[$5], cat=[$2], cnt=[$4])
-      +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE()), =($2, 
_UTF-16LE'fruit'), =($5, CURRENT_TIME()))])
+      +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE), =($2, 
_UTF-16LE'fruit'), =($5, CURRENT_TIME))])
          +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testTimestampFunction.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testTimestampFunction.out
new file mode 100644
index 00000000000..0093419b98d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testTimestampFunction.out
@@ -0,0 +1,102 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-values_1",
+    "tuples" : [ [ {
+      "kind" : "LITERAL",
+      "value" : 0,
+      "type" : "INT NOT NULL"
+    } ] ],
+    "outputType" : "ROW<`ZERO` INT NOT NULL>",
+    "description" : "Values(tuples=[[{ 0 }]])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "LITERAL",
+      "value" : 1,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "FUNCTION_ID",
+        "internalName" : "$CURRENT_TIMESTAMP$1",
+        "operands" : [ ],
+        "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` TIMESTAMP(3)>",
+    "description" : "Calc(select=[1 AS a, CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(3)) AS b])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT NOT NULL, `b` TIMESTAMP(3)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
index bc661188560..15d493d32f8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
@@ -28,7 +28,7 @@ GROUP BY cat, gmt_date]]>
       <![CDATA[
 LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
 +- LogicalProject(cat=[$2], gmt_date=[$3], cnt=[$4])
-   +- LogicalFilter(condition=[=($3, CURRENT_DATE())])
+   +- LogicalFilter(condition=[=($3, CURRENT_DATE)])
       +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
@@ -53,7 +53,7 @@ GroupAggregate(groupBy=[cat, gmt_date], select=[cat, 
gmt_date, SUM(cnt) AS EXPR$
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], ts=[$5], 
hh=[$6])
 +- LogicalFilter(condition=[AND(>(CAST($6):BIGINT, 12), LIKE($2, 
_UTF-16LE'fruit%'))])
-   +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
+   +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
@@ -78,7 +78,7 @@ GROUP BY cat, hh]]>
 LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
 +- LogicalProject(cat=[$2], hh=[$6], cnt=[$4])
    +- LogicalFilter(condition=[=(SUBSTR(CAST($5):VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE", 1, 2), $6)])
-      +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
+      +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], 
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" NOT NULL, 1, 2)])
          +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
@@ -106,7 +106,7 @@ GROUP BY gmt_date, ts, cat]]>
 LogicalProject(gmt_date=[$0], ts=[$1], cat=[$2], 
EXPR$3=[SUBSTR(CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 1, 2)], 
EXPR$4=[$3])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$4=[SUM($3)])
    +- LogicalProject(gmt_date=[$3], ts=[$5], cat=[$2], cnt=[$4])
-      +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE()), =($2, 
_UTF-16LE'fruit'), =($5, CURRENT_TIME()))])
+      +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE), =($2, 
_UTF-16LE'fruit'), =($5, CURRENT_TIME))])
          +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
index 0c9ca22456c..0c272d99242 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
@@ -56,7 +56,7 @@ MATCH_RECOGNIZE (
     <Resource name="ast">
       <![CDATA[
 LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2])
-+- LogicalMatch(partition=[[0]], order=[[1 ASC-nulls-first]], 
outputFields=[[symbol, dPrice, matchRowtime]], allRows=[false], 
after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], 
isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>=(PREV(A.$1, 0), 
-(CURRENT_TIMESTAMP(), 86400000:INTERVAL DAY))]], inputFields=[[symbol, 
matchRowtime, price, startTime]])
++- LogicalMatch(partition=[[0]], order=[[1 ASC-nulls-first]], 
outputFields=[[symbol, dPrice, matchRowtime]], allRows=[false], 
after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], 
isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>=(PREV(A.$1, 0), 
-(CURRENT_TIMESTAMP, 86400000:INTERVAL DAY))]], inputFields=[[symbol, 
matchRowtime, price, startTime]])
    +- LogicalProject(symbol=[$0], matchRowtime=[$1], price=[$2], 
startTime=[TUMBLE_START($3)])
       +- LogicalAggregate(group=[{0, 1, 2, 3}])
          +- LogicalProject(symbol=[$0], matchRowtime=[$3], price=[$1], 
$f3=[$TUMBLE($3, 3000:INTERVAL SECOND)])
@@ -67,7 +67,7 @@ LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Match(partitionBy=[symbol], orderBy=[matchRowtime ASC], 
measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime], 
rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], 
pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP(), 
86400000:INTERVAL DAY))}])
+Match(partitionBy=[symbol], orderBy=[matchRowtime ASC], 
measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime], 
rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], 
pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP, 
86400000:INTERVAL DAY))}])
 +- Exchange(distribution=[hash[symbol]])
    +- Calc(select=[symbol, matchRowtime, price, w$start AS startTime])
       +- GroupWindowAggregate(groupBy=[symbol, price, matchRowtime], 
window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[symbol, price, matchRowtime, start('w$) 
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index 4a7f213d81f..74aff5b27f4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -165,7 +165,7 @@ LogicalProject(a=[$0])
   LogicalFilter(condition=[>($1, 100)])
     LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 }))])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -203,7 +203,7 @@ LogicalProject(a=[$0])
   LogicalFilter(condition=[>($1, 100)])
     LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 }))])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -1958,7 +1958,7 @@ group by `day`
       <![CDATA[
 LogicalAggregate(group=[{0}], cnt=[COUNT()], qmt=[SUM($1)])
 +- LogicalProject(day=[$4], b=[$1])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
day=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
day=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -1989,7 +1989,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, cn
 +- LogicalProject(a=[$1], cnt=[$2], day=[$0])
    +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
       +- LogicalProject(day=[$4], a=[$0])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc]])
 ]]>
     </Resource>
@@ -2055,7 +2055,7 @@ LogicalProject(a=[$0], a0=[$4], c-day=[$6], b=[$5], 
d=[$7])
 +- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
    :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -2093,7 +2093,7 @@ LogicalProject(a=[$0], a0=[$4], c-day=[$6], b=[$5], 
d=[$7])
 +- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
    :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -2133,7 +2133,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
    +- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
       :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
       :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-      +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+      +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
          +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -2172,7 +2172,7 @@ LogicalProject(a=[$0], c-day=[$6], b=[$5], d=[$7])
 +- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
    :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -2210,7 +2210,7 @@ LogicalProject(a=[$0], c-day=[$6], b=[$5], d=[$7])
 +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
    :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -2250,7 +2250,7 @@ LogicalProject(a=[$0], c-day=[$6], b=[$5], d=[$7])
 +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
    :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -2420,9 +2420,9 @@ 
LogicalSink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR
 +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT(DISTINCT 
$3)])
    +- LogicalProject(a=[$0], day=[$2], b=[$3], c=[$5])
       +- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
-         :- LogicalProject(a=[$0], b=[$1], 
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+         :- LogicalProject(a=[$0], b=[$1], day=[DATE_FORMAT(CURRENT_TIMESTAMP, 
_UTF-16LE'yyMMdd')])
          :  +- LogicalTableScan(table=[[default_catalog, default_database, 
src1]])
-         +- LogicalProject(b=[$1], day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
+         +- LogicalProject(b=[$1], day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
src2]])
 
 LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, day, b, 
c])
@@ -2430,9 +2430,9 @@ 
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c
    +- LogicalFilter(condition=[>($2, 100)])
       +- LogicalProject(a=[$0], day=[$2], b=[$3], c=[$5])
          +- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
-            :- LogicalProject(a=[$0], b=[$1], 
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+            :- LogicalProject(a=[$0], b=[$1], 
day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, 
src1]])
-            +- LogicalProject(b=[$1], day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
+            +- LogicalProject(b=[$1], day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
                +- LogicalTableScan(table=[[default_catalog, default_database, 
src2]])
 ]]>
     </Resource>
@@ -2481,7 +2481,7 @@ group by a, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')
 LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
qmt, day])
 +- LogicalProject(a=[$0], qmt=[$2], day=[$1])
    +- LogicalAggregate(group=[{0, 1}], qmt=[SUM($2)])
-      +- LogicalProject(a=[$0], day=[DATE_FORMAT(CURRENT_TIMESTAMP(), 
_UTF-16LE'yyMMdd')], b=[$1])
+      +- LogicalProject(a=[$0], day=[DATE_FORMAT(CURRENT_TIMESTAMP, 
_UTF-16LE'yyMMdd')], b=[$1])
          +- LogicalFilter(condition=[=($4, 1)])
             +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rn=[ROW_NUMBER() 
OVER (PARTITION BY $0 ORDER BY PROCTIME() DESC NULLS LAST)])
                +- LogicalTableScan(table=[[default_catalog, default_database, 
src]])
@@ -2508,7 +2508,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink1], fields=[a, b, d])
 +- LogicalProject(a=[$0], b=[$1], d=[CAST($2):BIGINT])
    +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
       +- LogicalProject(a=[$0], day=[$2], b=[$1])
-         +- LogicalProject(a=[$1], b=[$3], 
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+         +- LogicalProject(a=[$1], b=[$3], day=[DATE_FORMAT(CURRENT_TIMESTAMP, 
_UTF-16LE'yyMMdd')])
             +- LogicalProject(id=[$0], a=[$1.nested2.num], 
name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)])
                +- LogicalProject(id=[$0], deepNested=[$1], name=[$2], 
metadata_1=[$3], metadata_2=[$4])
                   +- LogicalTableScan(table=[[default_catalog, 
default_database, nested_src, metadata=[metadata_1, metadata_2]]])
@@ -2984,7 +2984,7 @@ LogicalProject(a=[$0])
   LogicalFilter(condition=[>($1, 100)])
     LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 })])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -3022,7 +3022,7 @@ LogicalProject(a=[$0])
   LogicalFilter(condition=[>($1, 100)])
     LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 })])
-   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+   +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2, 
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 ]]>
     </Resource>
@@ -3056,7 +3056,7 @@ where t1.c in (
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 +- LogicalFilter(condition=[IN($2, {
-LogicalProject(c=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP(), 
_UTF-16LE'yyMMdd'))])
+LogicalProject(c=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP, 
_UTF-16LE'yyMMdd'))])
   LogicalFilter(condition=[>($1, 100)])
     LogicalTableScan(table=[[default_catalog, default_database, cdc]])
 })])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
new file mode 100644
index 00000000000..59da2c17b3a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
@@ -0,0 +1,117 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 21,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$/$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$EXTRACT$1",
+        "operands" : [ {
+          "kind" : "LITERAL",
+          "symbol" : "TIME_UNIT_RANGE",
+          "value" : "YEAR",
+          "type" : {
+            "type" : "SYMBOL",
+            "nullable" : false
+          }
+        }, {
+          "kind" : "CALL",
+          "internalName" : "$CURRENT_TIMESTAMP$1",
+          "operands" : [ ],
+          "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL"
+        } ],
+        "type" : "BIGINT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Calc(select=[(EXTRACT(YEAR, CURRENT_TIMESTAMP()) / a) AS 
EXPR$0])"
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
new file mode 100644
index 00000000000..085b8798cf8
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
 differ


Reply via email to