This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 53a2886b484 [FLINK-38938][table] Restore of SQL job with
`CURRENT_TIMESTAMP` fails with `CodeGenException`
53a2886b484 is described below
commit 53a2886b484a415fb8c972f87d5f2e1602675c8a
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Jan 19 12:39:14 2026 +0100
[FLINK-38938][table] Restore of SQL job with `CURRENT_TIMESTAMP` fails with
`CodeGenException`
---
.../sql/parser/validate/FlinkSqlConformance.java | 2 +-
.../functions/sql/FlinkSqlOperatorTable.java | 109 +++++--------------
.../table/planner/calcite/FlinkPlannerImpl.scala | 4 +-
.../plan/nodes/exec/common/CalcTestPrograms.java | 19 ++++
.../plan/nodes/exec/stream/CalcRestoreTest.java | 3 +-
.../nodes/exec/stream/PythonCalcJsonPlanTest.java | 13 +++
.../flink/table/planner/utils/DiffRepository.java | 12 ++-
.../plan/batch/sql/DynamicFunctionPlanTest.xml | 8 +-
.../testTimestampFunction.out | 102 ++++++++++++++++++
.../plan/stream/sql/DynamicFunctionPlanTest.xml | 8 +-
.../planner/plan/stream/sql/MatchRecognizeTest.xml | 4 +-
.../plan/stream/sql/NonDeterministicDagTest.xml | 38 +++----
.../plan/calc-current-timestamp.json | 117 +++++++++++++++++++++
.../calc-current-timestamp/savepoint/_metadata | Bin 0 -> 5868 bytes
14 files changed, 323 insertions(+), 116 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
index b1235d6a42c..16a6692abc4 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
@@ -114,7 +114,7 @@ public enum FlinkSqlConformance implements SqlConformance {
@Override
public boolean allowNiladicParentheses() {
- return false;
+ return true;
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 1d6ff95f3e9..b565f5764b0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -84,94 +84,39 @@ public class FlinkSqlOperatorTable extends
ReflectiveSqlOperatorTable {
// register functions based on batch or streaming mode
final FlinkSqlOperatorTable finalInstance = instance;
- dynamicFunctions(isBatchMode)
- .forEach(
- f -> {
- finalInstance.register(f);
- });
+ dynamicFunctions(isBatchMode).forEach(finalInstance::register);
cachedInstances.put(isBatchMode, finalInstance);
}
return instance;
}
public static List<SqlFunction> dynamicFunctions(boolean isBatchMode) {
- List<SqlFunction> sqlFunctions =
- Arrays.asList(
- new FlinkTimestampDynamicFunction(
- SqlStdOperatorTable.LOCALTIME.getName(),
- SqlTypeName.TIME,
- isBatchMode) {
- @Override
- public SqlSyntax getSyntax() {
- return SqlSyntax.FUNCTION;
- }
- },
- new FlinkTimestampDynamicFunction(
- SqlStdOperatorTable.LOCALTIME.getName(),
- SqlTypeName.TIME,
- isBatchMode),
- new FlinkTimestampDynamicFunction(
- SqlStdOperatorTable.CURRENT_TIME.getName(),
- SqlTypeName.TIME,
- isBatchMode) {
- @Override
- public SqlSyntax getSyntax() {
- return SqlSyntax.FUNCTION;
- }
- },
- new FlinkTimestampDynamicFunction(
- SqlStdOperatorTable.CURRENT_TIME.getName(),
- SqlTypeName.TIME,
- isBatchMode),
- new FlinkCurrentDateDynamicFunction(isBatchMode) {
- @Override
- public SqlSyntax getSyntax() {
- return SqlSyntax.FUNCTION;
- }
- },
- new FlinkCurrentDateDynamicFunction(isBatchMode),
- new FlinkTimestampWithPrecisionDynamicFunction(
- SqlStdOperatorTable.LOCALTIMESTAMP.getName(),
- SqlTypeName.TIMESTAMP,
- isBatchMode,
- 3) {
- @Override
- public SqlSyntax getSyntax() {
- return SqlSyntax.FUNCTION;
- }
- },
- new FlinkTimestampWithPrecisionDynamicFunction(
- SqlStdOperatorTable.LOCALTIMESTAMP.getName(),
- SqlTypeName.TIMESTAMP,
- isBatchMode,
- 3),
- new FlinkTimestampWithPrecisionDynamicFunction(
-
SqlStdOperatorTable.CURRENT_TIMESTAMP.getName(),
- SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
- isBatchMode,
- 3) {
- @Override
- public SqlSyntax getSyntax() {
- return SqlSyntax.FUNCTION;
- }
- },
- new FlinkTimestampWithPrecisionDynamicFunction(
-
SqlStdOperatorTable.CURRENT_TIMESTAMP.getName(),
- SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
- isBatchMode,
- 3),
- new FlinkTimestampWithPrecisionDynamicFunction(
- FlinkTimestampWithPrecisionDynamicFunction.NOW,
- SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
- isBatchMode,
- 3) {
- @Override
- public SqlSyntax getSyntax() {
- return SqlSyntax.FUNCTION;
- }
- });
-
- return sqlFunctions;
+ return List.of(
+ new FlinkTimestampDynamicFunction(
+ SqlStdOperatorTable.LOCALTIME.getName(),
SqlTypeName.TIME, isBatchMode),
+ new FlinkTimestampDynamicFunction(
+ SqlStdOperatorTable.CURRENT_TIME.getName(),
SqlTypeName.TIME, isBatchMode),
+ new FlinkCurrentDateDynamicFunction(isBatchMode),
+ new FlinkTimestampWithPrecisionDynamicFunction(
+ SqlStdOperatorTable.LOCALTIMESTAMP.getName(),
+ SqlTypeName.TIMESTAMP,
+ isBatchMode,
+ 3),
+ new FlinkTimestampWithPrecisionDynamicFunction(
+ SqlStdOperatorTable.CURRENT_TIMESTAMP.getName(),
+ SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ isBatchMode,
+ 3),
+ new FlinkTimestampWithPrecisionDynamicFunction(
+ FlinkTimestampWithPrecisionDynamicFunction.NOW,
+ SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ isBatchMode,
+ 3) {
+ @Override
+ public SqlSyntax getSyntax() {
+ return SqlSyntax.FUNCTION;
+ }
+ });
}
private static void validateNoDynamicFunction(FlinkSqlOperatorTable
instance)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 649ba8d90a2..9d8570848e7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -21,6 +21,7 @@ import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet,
SqlUseModules}
import org.apache.flink.sql.parser.dml._
import org.apache.flink.sql.parser.dql._
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.parse.CalciteParser
@@ -104,7 +105,8 @@ class FlinkPlannerImpl(
SqlValidator.Config.DEFAULT
.withIdentifierExpansion(true)
.withDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
- .withTypeCoercionEnabled(false),
+ .withTypeCoercionEnabled(false)
+ .withConformance(FlinkSqlConformance.DEFAULT),
createToRelContext(),
cluster,
config
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
index 0ccbaccc7a0..0e5a1848faa 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
@@ -211,4 +211,23 @@ public class CalcTestPrograms {
+ "from source_t where "
+ "(udf1(a) > 0 or (a * b) < 100) and b >
10")
.build();
+
+ public static final TableTestProgram CALC_CURRENT_TIMESTAMP =
+ TableTestProgram.of(
+ "calc-current-timestamp", "validates basic calc
with current timestamp")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("a BIGINT")
+ .producedBeforeRestore(Row.of(100L))
+ .producedAfterRestore(Row.of(10000L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a BIGINT")
+ .consumedBeforeRestore(Row.of(20L))
+ .consumedAfterRestore(Row.of(0L))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT extract(year from
current_timestamp) / a FROM t")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
index 80b5fde5e53..8de2e05e054 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
@@ -41,6 +41,7 @@ public class CalcRestoreTest extends RestoreTestBase {
CalcTestPrograms.CALC_PROJECT_PUSHDOWN,
CalcTestPrograms.CALC_SARG,
CalcTestPrograms.CALC_UDF_SIMPLE,
- CalcTestPrograms.CALC_UDF_COMPLEX);
+ CalcTestPrograms.CALC_UDF_COMPLEX,
+ CalcTestPrograms.CALC_CURRENT_TIMESTAMP);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
index dbe3c840cb0..1b577132cf7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
@@ -78,4 +78,17 @@ class PythonCalcJsonPlanTest extends TableTestBase {
tEnv.executeSql(sinkTableDdl);
util.verifyJsonPlan("insert into MySink select a, b from MyTable where
pyFunc(b, b + 1)");
}
+
+ @Test
+ void testTimestampFunction() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b timestamp(3)\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan("insert into MySink select 1, current_timestamp");
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
index 5506dc6a75b..9e99e8f9043 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/DiffRepository.java
@@ -21,7 +21,6 @@ import org.apache.calcite.avatica.util.Spaces;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.XmlOutput;
-import org.junit.jupiter.api.Assertions;
import org.opentest4j.AssertionFailedError;
import org.w3c.dom.CDATASection;
import org.w3c.dom.Comment;
@@ -48,6 +47,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.assertj.core.api.Assertions.assertThat;
+
// THIS FILE IS COPIED FROM APACHE CALCITE
/**
@@ -435,7 +436,14 @@ public class DiffRepository {
// for largish snippets
String expected2Canonical =
expected2.replace(Util.LINE_SEPARATOR, "\n");
String actualCanonical = actual.replace(Util.LINE_SEPARATOR,
"\n");
- Assertions.assertEquals(expected2Canonical, actualCanonical,
tag);
+ assertThat(actualCanonical)
+ .withFailMessage(
+ () ->
+ "Expected: "
+ + expected2Canonical
+ + " but was: "
+ + actualCanonical)
+ .isEqualTo(expected2Canonical);
} catch (AssertionFailedError e) {
amend(testCaseName, expected, actual);
throw e;
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
index c35914e6ebc..1405e31e772 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
@@ -28,7 +28,7 @@ GROUP BY cat, gmt_date]]>
<![CDATA[
LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
+- LogicalProject(cat=[$2], gmt_date=[$3], cnt=[$4])
- +- LogicalFilter(condition=[=($3, CURRENT_DATE())])
+ +- LogicalFilter(condition=[=($3, CURRENT_DATE)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
@@ -53,7 +53,7 @@ HashAggregate(isMerge=[false], groupBy=[cat],
auxGrouping=[gmt_date], select=[ca
<![CDATA[
LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], ts=[$5],
hh=[$6])
+- LogicalFilter(condition=[AND(>(CAST($6):BIGINT, 12), LIKE($2,
_UTF-16LE'fruit%'))])
- +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+ +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
@@ -77,7 +77,7 @@ GROUP BY cat, hh]]>
LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
+- LogicalProject(cat=[$2], hh=[$6], cnt=[$4])
+- LogicalFilter(condition=[=(SUBSTR(CAST($5):VARCHAR(2147483647) CHARACTER
SET "UTF-16LE", 1, 2), $6)])
- +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+ +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
@@ -104,7 +104,7 @@ GROUP BY gmt_date, ts, cat]]>
LogicalProject(gmt_date=[$0], ts=[$1], cat=[$2],
EXPR$3=[SUBSTR(CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 1, 2)],
EXPR$4=[$3])
+- LogicalAggregate(group=[{0, 1, 2}], EXPR$4=[SUM($3)])
+- LogicalProject(gmt_date=[$3], ts=[$5], cat=[$2], cnt=[$4])
- +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE()), =($2,
_UTF-16LE'fruit'), =($5, CURRENT_TIME()))])
+ +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE), =($2,
_UTF-16LE'fruit'), =($5, CURRENT_TIME))])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testTimestampFunction.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testTimestampFunction.out
new file mode 100644
index 00000000000..0093419b98d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testTimestampFunction.out
@@ -0,0 +1,102 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-values_1",
+ "tuples" : [ [ {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ] ],
+ "outputType" : "ROW<`ZERO` INT NOT NULL>",
+ "description" : "Values(tuples=[[{ 0 }]])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "FUNCTION_ID",
+ "internalName" : "$CURRENT_TIMESTAMP$1",
+ "operands" : [ ],
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT NOT NULL, `b` TIMESTAMP(3)>",
+ "description" : "Calc(select=[1 AS a, CAST(CURRENT_TIMESTAMP() AS
TIMESTAMP(3)) AS b])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "table-sink-class" : "DEFAULT"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT NOT NULL, `b` TIMESTAMP(3)>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, b])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
index bc661188560..15d493d32f8 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DynamicFunctionPlanTest.xml
@@ -28,7 +28,7 @@ GROUP BY cat, gmt_date]]>
<![CDATA[
LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
+- LogicalProject(cat=[$2], gmt_date=[$3], cnt=[$4])
- +- LogicalFilter(condition=[=($3, CURRENT_DATE())])
+ +- LogicalFilter(condition=[=($3, CURRENT_DATE)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
@@ -53,7 +53,7 @@ GroupAggregate(groupBy=[cat, gmt_date], select=[cat,
gmt_date, SUM(cnt) AS EXPR$
<![CDATA[
LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4], ts=[$5],
hh=[$6])
+- LogicalFilter(condition=[AND(>(CAST($6):BIGINT, 12), LIKE($2,
_UTF-16LE'fruit%'))])
- +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+ +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
@@ -78,7 +78,7 @@ GROUP BY cat, hh]]>
LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
+- LogicalProject(cat=[$2], hh=[$6], cnt=[$4])
+- LogicalFilter(condition=[=(SUBSTR(CAST($5):VARCHAR(2147483647) CHARACTER
SET "UTF-16LE", 1, 2), $6)])
- +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME()):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+ +- LogicalProject(a=[$0], b=[$1], cat=[$2], gmt_date=[$3], cnt=[$4],
ts=[$5], hh=[SUBSTR(CAST(LOCALTIME):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" NOT NULL, 1, 2)])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
@@ -106,7 +106,7 @@ GROUP BY gmt_date, ts, cat]]>
LogicalProject(gmt_date=[$0], ts=[$1], cat=[$2],
EXPR$3=[SUBSTR(CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 1, 2)],
EXPR$4=[$3])
+- LogicalAggregate(group=[{0, 1, 2}], EXPR$4=[SUM($3)])
+- LogicalProject(gmt_date=[$3], ts=[$5], cat=[$2], cnt=[$4])
- +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE()), =($2,
_UTF-16LE'fruit'), =($5, CURRENT_TIME()))])
+ +- LogicalFilter(condition=[AND(=($3, CURRENT_DATE), =($2,
_UTF-16LE'fruit'), =($5, CURRENT_TIME))])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
index 0c9ca22456c..0c272d99242 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
@@ -56,7 +56,7 @@ MATCH_RECOGNIZE (
<Resource name="ast">
<![CDATA[
LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2])
-+- LogicalMatch(partition=[[0]], order=[[1 ASC-nulls-first]],
outputFields=[[symbol, dPrice, matchRowtime]], allRows=[false],
after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false],
isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>=(PREV(A.$1, 0),
-(CURRENT_TIMESTAMP(), 86400000:INTERVAL DAY))]], inputFields=[[symbol,
matchRowtime, price, startTime]])
++- LogicalMatch(partition=[[0]], order=[[1 ASC-nulls-first]],
outputFields=[[symbol, dPrice, matchRowtime]], allRows=[false],
after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false],
isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>=(PREV(A.$1, 0),
-(CURRENT_TIMESTAMP, 86400000:INTERVAL DAY))]], inputFields=[[symbol,
matchRowtime, price, startTime]])
+- LogicalProject(symbol=[$0], matchRowtime=[$1], price=[$2],
startTime=[TUMBLE_START($3)])
+- LogicalAggregate(group=[{0, 1, 2, 3}])
+- LogicalProject(symbol=[$0], matchRowtime=[$3], price=[$1],
$f3=[$TUMBLE($3, 3000:INTERVAL SECOND)])
@@ -67,7 +67,7 @@ LogicalProject(symbol=[$0], dPrice=[$1], matchRowtime=[$2])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Match(partitionBy=[symbol], orderBy=[matchRowtime ASC],
measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime],
rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW],
pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP(),
86400000:INTERVAL DAY))}])
+Match(partitionBy=[symbol], orderBy=[matchRowtime ASC],
measures=[FINAL(A.price) AS dPrice, FINAL(A.matchRowtime) AS matchRowtime],
rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW],
pattern=[_UTF-16LE'A'], define=[{A=>=(PREV(A.$1, 0), -(CURRENT_TIMESTAMP,
86400000:INTERVAL DAY))}])
+- Exchange(distribution=[hash[symbol]])
+- Calc(select=[symbol, matchRowtime, price, w$start AS startTime])
+- GroupWindowAggregate(groupBy=[symbol, price, matchRowtime],
window=[TumblingGroupWindow('w$, matchRowtime0, 3000)], properties=[w$start,
w$end, w$rowtime, w$proctime], select=[symbol, price, matchRowtime, start('w$)
AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index 4a7f213d81f..74aff5b27f4 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -165,7 +165,7 @@ LogicalProject(a=[$0])
LogicalFilter(condition=[>($1, 100)])
LogicalTableScan(table=[[default_catalog, default_database, cdc]])
}))])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -203,7 +203,7 @@ LogicalProject(a=[$0])
LogicalFilter(condition=[>($1, 100)])
LogicalTableScan(table=[[default_catalog, default_database, cdc]])
}))])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -1958,7 +1958,7 @@ group by `day`
<![CDATA[
LogicalAggregate(group=[{0}], cnt=[COUNT()], qmt=[SUM($1)])
+- LogicalProject(day=[$4], b=[$1])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
day=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
day=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -1989,7 +1989,7 @@
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, cn
+- LogicalProject(a=[$1], cnt=[$2], day=[$0])
+- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
+- LogicalProject(day=[$4], a=[$0])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')])
+- LogicalTableScan(table=[[default_catalog, default_database,
cdc]])
]]>
</Resource>
@@ -2055,7 +2055,7 @@ LogicalProject(a=[$0], a0=[$4], c-day=[$6], b=[$5],
d=[$7])
+- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -2093,7 +2093,7 @@ LogicalProject(a=[$0], a0=[$4], c-day=[$6], b=[$5],
d=[$7])
+- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -2133,7 +2133,7 @@
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
+- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -2172,7 +2172,7 @@ LogicalProject(a=[$0], c-day=[$6], b=[$5], d=[$7])
+- LogicalJoin(condition=[=($1, $5)], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -2210,7 +2210,7 @@ LogicalProject(a=[$0], c-day=[$6], b=[$5], d=[$7])
+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -2250,7 +2250,7 @@ LogicalProject(a=[$0], c-day=[$6], b=[$5], d=[$7])
+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
:- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -2420,9 +2420,9 @@
LogicalSink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR
+- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT(DISTINCT
$3)])
+- LogicalProject(a=[$0], day=[$2], b=[$3], c=[$5])
+- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
- :- LogicalProject(a=[$0], b=[$1],
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+ :- LogicalProject(a=[$0], b=[$1], day=[DATE_FORMAT(CURRENT_TIMESTAMP,
_UTF-16LE'yyMMdd')])
: +- LogicalTableScan(table=[[default_catalog, default_database,
src1]])
- +- LogicalProject(b=[$1], day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
+ +- LogicalProject(b=[$1], day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database,
src2]])
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, day, b,
c])
@@ -2430,9 +2430,9 @@
LogicalSink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c
+- LogicalFilter(condition=[>($2, 100)])
+- LogicalProject(a=[$0], day=[$2], b=[$3], c=[$5])
+- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
- :- LogicalProject(a=[$0], b=[$1],
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+ :- LogicalProject(a=[$0], b=[$1],
day=[DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd')])
: +- LogicalTableScan(table=[[default_catalog, default_database,
src1]])
- +- LogicalProject(b=[$1], day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
+ +- LogicalProject(b=[$1], day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], c=[$2], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database,
src2]])
]]>
</Resource>
@@ -2481,7 +2481,7 @@ group by a, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a,
qmt, day])
+- LogicalProject(a=[$0], qmt=[$2], day=[$1])
+- LogicalAggregate(group=[{0, 1}], qmt=[SUM($2)])
- +- LogicalProject(a=[$0], day=[DATE_FORMAT(CURRENT_TIMESTAMP(),
_UTF-16LE'yyMMdd')], b=[$1])
+ +- LogicalProject(a=[$0], day=[DATE_FORMAT(CURRENT_TIMESTAMP,
_UTF-16LE'yyMMdd')], b=[$1])
+- LogicalFilter(condition=[=($4, 1)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rn=[ROW_NUMBER()
OVER (PARTITION BY $0 ORDER BY PROCTIME() DESC NULLS LAST)])
+- LogicalTableScan(table=[[default_catalog, default_database,
src]])
@@ -2508,7 +2508,7 @@
LogicalSink(table=[default_catalog.default_database.sink1], fields=[a, b, d])
+- LogicalProject(a=[$0], b=[$1], d=[CAST($2):BIGINT])
+- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
+- LogicalProject(a=[$0], day=[$2], b=[$1])
- +- LogicalProject(a=[$1], b=[$3],
day=[DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd')])
+ +- LogicalProject(a=[$1], b=[$3], day=[DATE_FORMAT(CURRENT_TIMESTAMP,
_UTF-16LE'yyMMdd')])
+- LogicalProject(id=[$0], a=[$1.nested2.num],
name=[$1.nested1.name], b=[+(+($1.nested1.value, $1.nested2.num), $3)])
+- LogicalProject(id=[$0], deepNested=[$1], name=[$2],
metadata_1=[$3], metadata_2=[$4])
+- LogicalTableScan(table=[[default_catalog,
default_database, nested_src, metadata=[metadata_1, metadata_2]]])
@@ -2984,7 +2984,7 @@ LogicalProject(a=[$0])
LogicalFilter(condition=[>($1, 100)])
LogicalTableScan(table=[[default_catalog, default_database, cdc]])
})])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -3022,7 +3022,7 @@ LogicalProject(a=[$0])
LogicalFilter(condition=[>($1, 100)])
LogicalTableScan(table=[[default_catalog, default_database, cdc]])
})])
- +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd'))], d=[$3])
+ +- LogicalProject(a=[$0], b=[$1], c-day=[CONCAT($2,
DATE_FORMAT(CURRENT_TIMESTAMP, _UTF-16LE'yyMMdd'))], d=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
]]>
</Resource>
@@ -3056,7 +3056,7 @@ where t1.c in (
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[IN($2, {
-LogicalProject(c=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP(),
_UTF-16LE'yyMMdd'))])
+LogicalProject(c=[CONCAT($2, DATE_FORMAT(CURRENT_TIMESTAMP,
_UTF-16LE'yyMMdd'))])
LogicalFilter(condition=[>($1, 100)])
LogicalTableScan(table=[[default_catalog, default_database, cdc]])
})])
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
new file mode 100644
index 00000000000..59da2c17b3a
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/plan/calc-current-timestamp.json
@@ -0,0 +1,117 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 21,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 22,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$/$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "internalName" : "$EXTRACT$1",
+ "operands" : [ {
+ "kind" : "LITERAL",
+ "symbol" : "TIME_UNIT_RANGE",
+ "value" : "YEAR",
+ "type" : {
+ "type" : "SYMBOL",
+ "nullable" : false
+ }
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$CURRENT_TIMESTAMP$1",
+ "operands" : [ ],
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT>",
+ "description" : "Calc(select=[(EXTRACT(YEAR, CURRENT_TIMESTAMP()) / a) AS
EXPR$0])"
+ }, {
+ "id" : 23,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[EXPR$0])"
+ } ],
+ "edges" : [ {
+ "source" : 21,
+ "target" : 22,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 22,
+ "target" : 23,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
new file mode 100644
index 00000000000..085b8798cf8
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-current-timestamp/savepoint/_metadata
differ