This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f751a00fd6f [FLINK-33758] Implement restore tests for TemporalSort 
node (#23879)
f751a00fd6f is described below

commit f751a00fd6f0e70187d2a9ae2ccd6a728d9a2c64
Author: James Hughes <jhug...@confluent.io>
AuthorDate: Thu Dec 7 07:51:40 2023 -0500

    [FLINK-33758] Implement restore tests for TemporalSort node (#23879)
---
 .../exec/stream/TemporalSortJsonPlanTest.java      |  68 --------
 .../nodes/exec/stream/TemporalSortRestoreTest.java |  40 +++++
 .../exec/stream/TemporalSortTestPrograms.java      |  96 +++++++++++
 .../stream/jsonplan/TemporalSortJsonITCase.java    |  87 ----------
 .../plan/temporal-sort-proctime.json}              | 180 +++------------------
 .../temporal-sort-proctime/savepoint/_metadata     | Bin 0 -> 6037 bytes
 .../plan/temporal-sort-rowtime.json}               | 144 +++++++----------
 .../temporal-sort-rowtime/savepoint/_metadata      | Bin 0 -> 10729 bytes
 8 files changed, 210 insertions(+), 405 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
deleted file mode 100644
index 8ebd6a47e59..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for temporal sort. */
-class TemporalSortJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        TableEnvironment tEnv = util.getTableEnv();
-
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + " a INT,\n"
-                        + " b BIGINT,\n"
-                        + " c VARCHAR,\n"
-                        + " `rowtime` AS TO_TIMESTAMP(c),\n"
-                        + " proctime as PROCTIME(),\n"
-                        + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(srcTableDdl);
-
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-    }
-
-    @Test
-    void testSortProcessingTime() {
-        util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by 
proctime, c");
-    }
-
-    @Test
-    void testSortRowTime() {
-        util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by 
rowtime, c");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java
new file mode 100644
index 00000000000..f6a1b0fcea9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecTemporalSort}. */
+public class TemporalSortRestoreTest extends RestoreTestBase {
+
+    public TemporalSortRestoreTest() {
+        super(StreamExecTemporalSort.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                TemporalSortTestPrograms.TEMPORAL_SORT_PROCTIME,
+                TemporalSortTestPrograms.TEMPORAL_SORT_ROWTIME);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java
new file mode 100644
index 00000000000..c340435c7a2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+    static final Row[] BEFORE_DATA = {
+        Row.of("2020-10-10 00:00:01", 1, 1d),
+        Row.of("2020-10-10 00:00:02", 2, 2d),
+        Row.of("2020-10-10 00:00:07", 5, 6d),
+        Row.of("2020-10-10 00:00:07", 3, 3d),
+        // out of order
+        Row.of("2020-10-10 00:00:06", 6, 6d),
+        Row.of("2020-10-10 00:00:08", 3, null),
+        // late event
+        Row.of("2020-10-10 00:00:04", 5, 5d),
+        Row.of("2020-10-10 00:00:16", 4, 4d)
+    };
+
+    static final Row[] AFTER_DATA = {
+        Row.of("2020-10-10 00:00:40", 10, 3d), Row.of("2020-10-10 00:00:42", 
11, 4d)
+    };
+    static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+            TableTestProgram.of(
+                            "temporal-sort-proctime", "validates temporal sort 
node with proctime")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "a INT",
+                                            "b BIGINT",
+                                            "c STRING",
+                                            "`proctime` as PROCTIME()")
+                                    .producedBeforeRestore(
+                                            Row.of(1, 1L, "Hi"),
+                                            Row.of(2, 2L, "Hello"),
+                                            Row.of(3, 2L, "Hello world"))
+                                    .producedAfterRestore(
+                                            Row.of(4, 1L, "Guten Morgen"),
+                                            Row.of(5, 2L, "Guten Tag"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT")
+                                    .consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+                                    .consumedAfterRestore("+I[4]", "+I[5]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+                    .build();
+
+    static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+            TableTestProgram.of(
+                            "temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "ts STRING",
+                                            "`int` INT",
+                                            "`double` DOUBLE",
+                                            "`rowtime` AS TO_TIMESTAMP(`ts`)",
+                                            "WATERMARK for `rowtime` AS 
`rowtime` - INTERVAL '1' SECOND")
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT")
+                                    .consumedBeforeRestore(
+                                            "+I[1]", "+I[2]", "+I[6]", 
"+I[3]", "+I[5]", "+I[3]")
+                                    .consumedAfterRestore("+I[4]", "+I[10]", 
"+I[11]")
+                                    .build())
+                    .runSql(
+                            "insert into sink_t SELECT `int` FROM source_t 
order by rowtime, `double`")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
deleted file mode 100644
index fe623816f1a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for temporal sort json plan. */
-class TemporalSortJsonITCase extends JsonPlanTestBase {
-
-    @Test
-    void testSortProcessingTime() throws Exception {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.smallData3()),
-                "a INT",
-                "b BIGINT",
-                "c STRING",
-                "proctime as PROCTIME()");
-        createTestValuesSinkTable("MySink", "a INT");
-        compileSqlAndExecutePlan("insert into MySink SELECT a FROM MyTable 
order by proctime")
-                .await();
-
-        assertResult(
-                Arrays.asList("+I[1]", "+I[2]", "+I[3]"),
-                TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testSortRowTime() throws Exception {
-        createTestValuesSourceTable(
-                "MyTable",
-                
JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()),
-                new String[] {
-                    "ts STRING",
-                    "`int` INT",
-                    "`double` DOUBLE",
-                    "`float` FLOAT",
-                    "`bigdec` DECIMAL(10, 2)",
-                    "`string` STRING",
-                    "`name` STRING",
-                    "`rowtime` AS TO_TIMESTAMP(`ts`)",
-                    "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' 
SECOND",
-                },
-                new HashMap<String, String>() {
-                    {
-                        put("enable-watermark-push-down", "true");
-                        put("failing-source", "true");
-                    }
-                });
-        createTestValuesSinkTable("MySink", "`int` INT");
-        compileSqlAndExecutePlan(
-                        "insert into MySink SELECT `int` FROM MyTable order by 
rowtime, `double`")
-                .await();
-
-        assertThat(TestValuesTableFactory.getResultsAsStrings("MySink"))
-                .isEqualTo(
-                        Arrays.asList(
-                                "+I[1]", "+I[2]", "+I[2]", "+I[5]", "+I[6]", 
"+I[3]", "+I[3]",
-                                "+I[4]", "+I[7]", "+I[1]"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/plan/temporal-sort-proctime.json
similarity index 53%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out
rename to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/plan/temporal-sort-proctime.json
index bb931b580c5..733ce6266f7 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/plan/temporal-sort-proctime.json
@@ -1,11 +1,11 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "1.19",
   "nodes" : [ {
     "id" : 1,
     "type" : "stream-exec-table-source-scan_1",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -17,22 +17,6 @@
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "rowtime",
-              "kind" : "COMPUTED",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "internalName" : "$TO_TIMESTAMP$1",
-                  "operands" : [ {
-                    "kind" : "INPUT_REF",
-                    "inputIndex" : 2,
-                    "type" : "VARCHAR(2147483647)"
-                  } ],
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "TO_TIMESTAMP(`c`)"
-              }
             }, {
               "name" : "proctime",
               "kind" : "COMPUTED",
@@ -51,46 +35,23 @@
                 "serializableString" : "PROCTIME()"
               }
             } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "syntax" : "SPECIAL",
-                  "internalName" : "$-$1",
-                  "operands" : [ {
-                    "kind" : "INPUT_REF",
-                    "inputIndex" : 3,
-                    "type" : "TIMESTAMP(3)"
-                  }, {
-                    "kind" : "LITERAL",
-                    "value" : "1000",
-                    "type" : "INTERVAL SECOND(6) NOT NULL"
-                  } ],
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
-              }
-            } ]
+            "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
       },
       "abilities" : [ {
         "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 2 ] ],
-        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+        "projectedFields" : [ [ 0 ] ],
+        "producedType" : "ROW<`a` INT> NOT NULL"
       }, {
         "type" : "ReadingMetadata",
         "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+        "producedType" : "ROW<`a` INT> NOT NULL"
       } ]
     },
-    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])",
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[a], metadata=[]]], fields=[a])",
     "inputProperties" : [ ]
   }, {
     "id" : 2,
@@ -109,19 +70,6 @@
         "precision" : 3,
         "kind" : "PROCTIME"
       }
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "CALL",
-      "internalName" : "$TO_TIMESTAMP$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "VARCHAR(2147483647)"
-      } ],
-      "type" : "TIMESTAMP(3)"
     } ],
     "condition" : null,
     "inputProperties" : [ {
@@ -144,69 +92,11 @@
           "precision" : 3,
           "kind" : "PROCTIME"
         }
-      }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : "TIMESTAMP(3)"
       } ]
     },
-    "description" : "Calc(select=[a, PROCTIME() AS proctime, c, 
TO_TIMESTAMP(c) AS rowtime])"
+    "description" : "Calc(select=[a, PROCTIME() AS proctime])"
   }, {
     "id" : 3,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$-$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 3,
-        "type" : "TIMESTAMP(3)"
-      }, {
-        "kind" : "LITERAL",
-        "value" : "1000",
-        "type" : "INTERVAL SECOND(6) NOT NULL"
-      } ],
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 3,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "a",
-        "fieldType" : "INT"
-      }, {
-        "name" : "proctime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
-          "nullable" : false,
-          "precision" : 3,
-          "kind" : "PROCTIME"
-        }
-      }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
-  }, {
-    "id" : 4,
     "type" : "stream-exec-exchange_1",
     "inputProperties" : [ {
       "requiredDistribution" : {
@@ -228,31 +118,17 @@
           "precision" : 3,
           "kind" : "PROCTIME"
         }
-      }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
       } ]
     },
     "description" : "Exchange(distribution=[single])"
   }, {
-    "id" : 5,
+    "id" : 4,
     "type" : "stream-exec-temporal-sort_1",
     "orderBy" : {
       "fields" : [ {
         "index" : 1,
         "isAscending" : true,
         "nullIsLast" : false
-      }, {
-        "index" : 2,
-        "isAscending" : true,
-        "nullIsLast" : false
       } ]
     },
     "inputProperties" : [ {
@@ -275,21 +151,11 @@
           "precision" : 3,
           "kind" : "PROCTIME"
         }
-      }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
       } ]
     },
-    "description" : "TemporalSort(orderBy=[proctime ASC, c ASC])"
+    "description" : "TemporalSort(orderBy=[proctime ASC])"
   }, {
-    "id" : 6,
+    "id" : 5,
     "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
@@ -307,7 +173,7 @@
     "outputType" : "ROW<`a` INT>",
     "description" : "Calc(select=[a])"
   }, {
-    "id" : 7,
+    "id" : 6,
     "type" : "stream-exec-sink_1",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
@@ -318,7 +184,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -327,10 +193,7 @@
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
@@ -343,7 +206,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a])"
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a])"
   } ],
   "edges" : [ {
     "source" : 1,
@@ -380,12 +243,5 @@
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/savepoint/_metadata
new file mode 100644
index 00000000000..4bfe20a0264
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/plan/temporal-sort-rowtime.json
similarity index 70%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out
rename to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/plan/temporal-sort-rowtime.json
index 8db2bd2bfe3..345f9904a82 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/plan/temporal-sort-rowtime.json
@@ -1,22 +1,22 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "1.19",
   "nodes" : [ {
-    "id" : 1,
+    "id" : 7,
     "type" : "stream-exec-table-source-scan_1",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
-              "name" : "a",
-              "dataType" : "INT"
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
             }, {
-              "name" : "b",
-              "dataType" : "BIGINT"
+              "name" : "int",
+              "dataType" : "INT"
             }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
+              "name" : "double",
+              "dataType" : "DOUBLE"
             }, {
               "name" : "rowtime",
               "kind" : "COMPUTED",
@@ -26,29 +26,12 @@
                   "internalName" : "$TO_TIMESTAMP$1",
                   "operands" : [ {
                     "kind" : "INPUT_REF",
-                    "inputIndex" : 2,
+                    "inputIndex" : 0,
                     "type" : "VARCHAR(2147483647)"
                   } ],
                   "type" : "TIMESTAMP(3)"
                 },
-                "serializableString" : "TO_TIMESTAMP(`c`)"
-              }
-            }, {
-              "name" : "proctime",
-              "kind" : "COMPUTED",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "internalName" : "$PROCTIME$1",
-                  "operands" : [ ],
-                  "type" : {
-                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
-                    "nullable" : false,
-                    "precision" : 3,
-                    "kind" : "PROCTIME"
-                  }
-                },
-                "serializableString" : "PROCTIME()"
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
               }
             } ],
             "watermarkSpecs" : [ {
@@ -73,45 +56,33 @@
               }
             } ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 2 ] ],
-        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
-      } ]
+      }
     },
-    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])",
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `int` INT, `double` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[ts, int, double])",
     "inputProperties" : [ ]
   }, {
-    "id" : 2,
+    "id" : 8,
     "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
-      "inputIndex" : 0,
+      "inputIndex" : 1,
       "type" : "INT"
     }, {
       "kind" : "CALL",
       "internalName" : "$TO_TIMESTAMP$1",
       "operands" : [ {
         "kind" : "INPUT_REF",
-        "inputIndex" : 1,
+        "inputIndex" : 0,
         "type" : "VARCHAR(2147483647)"
       } ],
       "type" : "TIMESTAMP(3)"
     }, {
       "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "VARCHAR(2147483647)"
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
     } ],
     "condition" : null,
     "inputProperties" : [ {
@@ -121,10 +92,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` INT, `rowtime` TIMESTAMP(3), `c` 
VARCHAR(2147483647)>",
-    "description" : "Calc(select=[a, TO_TIMESTAMP(c) AS rowtime, c])"
+    "outputType" : "ROW<`int` INT, `rowtime` TIMESTAMP(3), `double` DOUBLE>",
+    "description" : "Calc(select=[int, TO_TIMESTAMP(ts) AS rowtime, double])"
   }, {
-    "id" : 3,
+    "id" : 9,
     "type" : "stream-exec-watermark-assigner_1",
     "watermarkExpr" : {
       "kind" : "CALL",
@@ -152,7 +123,7 @@
     "outputType" : {
       "type" : "ROW",
       "fields" : [ {
-        "name" : "a",
+        "name" : "int",
         "fieldType" : "INT"
       }, {
         "name" : "rowtime",
@@ -162,13 +133,13 @@
           "kind" : "ROWTIME"
         }
       }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
+        "name" : "double",
+        "fieldType" : "DOUBLE"
       } ]
     },
     "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
   }, {
-    "id" : 4,
+    "id" : 10,
     "type" : "stream-exec-exchange_1",
     "inputProperties" : [ {
       "requiredDistribution" : {
@@ -180,7 +151,7 @@
     "outputType" : {
       "type" : "ROW",
       "fields" : [ {
-        "name" : "a",
+        "name" : "int",
         "fieldType" : "INT"
       }, {
         "name" : "rowtime",
@@ -190,13 +161,13 @@
           "kind" : "ROWTIME"
         }
       }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
+        "name" : "double",
+        "fieldType" : "DOUBLE"
       } ]
     },
     "description" : "Exchange(distribution=[single])"
   }, {
-    "id" : 5,
+    "id" : 11,
     "type" : "stream-exec-temporal-sort_1",
     "orderBy" : {
       "fields" : [ {
@@ -219,7 +190,7 @@
     "outputType" : {
       "type" : "ROW",
       "fields" : [ {
-        "name" : "a",
+        "name" : "int",
         "fieldType" : "INT"
       }, {
         "name" : "rowtime",
@@ -229,13 +200,13 @@
           "kind" : "ROWTIME"
         }
       }, {
-        "name" : "c",
-        "fieldType" : "VARCHAR(2147483647)"
+        "name" : "double",
+        "fieldType" : "DOUBLE"
       } ]
     },
-    "description" : "TemporalSort(orderBy=[rowtime ASC, c ASC])"
+    "description" : "TemporalSort(orderBy=[rowtime ASC, double ASC])"
   }, {
-    "id" : 6,
+    "id" : 12,
     "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
@@ -250,10 +221,10 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` INT>",
-    "description" : "Calc(select=[a])"
+    "outputType" : "ROW<`int` INT>",
+    "description" : "Calc(select=[int])"
   }, {
-    "id" : 7,
+    "id" : 13,
     "type" : "stream-exec-sink_1",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
@@ -264,7 +235,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -273,10 +244,7 @@
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
@@ -288,50 +256,50 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a])"
+    "outputType" : "ROW<`int` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[int])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
+    "source" : 7,
+    "target" : 8,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 8,
+    "target" : 9,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 3,
-    "target" : 4,
+    "source" : 9,
+    "target" : 10,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 4,
-    "target" : 5,
+    "source" : 10,
+    "target" : 11,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 5,
-    "target" : 6,
+    "source" : 11,
+    "target" : 12,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 6,
-    "target" : 7,
+    "source" : 12,
+    "target" : 13,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/savepoint/_metadata
new file mode 100644
index 00000000000..9dd97efe2dd
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/savepoint/_metadata
 differ


Reply via email to