This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ff359b579b44b4050111f2cee58c1debbb5dcf1
Author: godfreyhe <[email protected]>
AuthorDate: Fri Mar 19 18:43:08 2021 +0800

    [FLINK-21869][table-planner-blink] Support StreamExecTemporalSort json 
serialization/deserialization
    
    This closes #15284
---
 .../nodes/exec/stream/StreamExecTemporalSort.java  |  30 +-
 .../nodes/exec/stream/JsonSerdeCoverageTest.java   |   1 -
 .../exec/stream/TemporalSortJsonPlanTest.java      |  68 ++++
 .../stream/jsonplan/TemporalSortJsonITCase.java    |  89 +++++
 .../table/planner/utils/JsonPlanTestBase.java      |  12 +-
 .../testSortProcessingTime.out                     | 386 +++++++++++++++++++++
 .../testSortRowTime.out                            | 346 ++++++++++++++++++
 7 files changed, 922 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
index fc5b90e..4380875 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
@@ -40,12 +40,22 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** {@link StreamExecNode} for time-ascending-order Sort without `limit`. */
 public class StreamExecTemporalSort extends ExecNodeBase<RowData>
         implements StreamExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
 
+    public static final String FIELD_NAME_SORT_SPEC = "orderBy";
+
+    @JsonProperty(FIELD_NAME_SORT_SPEC)
     private final SortSpec sortSpec;
 
     public StreamExecTemporalSort(
@@ -53,8 +63,24 @@ public class StreamExecTemporalSort extends 
ExecNodeBase<RowData>
             InputProperty inputProperty,
             RowType outputType,
             String description) {
-        super(Collections.singletonList(inputProperty), outputType, 
description);
-        this.sortSpec = sortSpec;
+        this(
+                sortSpec,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @JsonCreator
+    public StreamExecTemporalSort(
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, inputProperties, outputType, description);
+        checkArgument(inputProperties.size() == 1);
+        this.sortSpec = checkNotNull(sortSpec);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
index 5879c2ce..f737719 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
@@ -51,7 +51,6 @@ public class JsonSerdeCoverageTest {
                     "StreamExecPythonOverAggregate",
                     "StreamExecPythonCorrelate",
                     "StreamExecPythonCalc",
-                    "StreamExecTemporalSort",
                     "StreamExecSort",
                     "StreamExecMultipleInput",
                     "StreamExecValues");
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
new file mode 100644
index 0000000..3ba476d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for temporal sort. */
+public class TemporalSortJsonPlanTest extends TableTestBase {
+
+    private StreamTableTestUtil util;
+
+    @Before
+    public void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        TableEnvironment tEnv = util.getTableEnv();
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + " a INT,\n"
+                        + " b BIGINT,\n"
+                        + " c VARCHAR,\n"
+                        + " `rowtime` AS TO_TIMESTAMP(c),\n"
+                        + " proctime as PROCTIME(),\n"
+                        + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(srcTableDdl);
+
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " a INT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+    }
+
+    @Test
+    public void testSortProcessingTime() {
+        util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by 
proctime, c");
+    }
+
+    @Test
+    public void testSortRowTime() {
+        util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by 
rowtime, c");
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
new file mode 100644
index 0000000..0e117c8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for temporal sort json plan. */
+public class TemporalSortJsonITCase extends JsonPlanTestBase {
+
+    @Test
+    public void testSortProcessingTime() throws Exception {
+        createTestValuesSourceTable(
+                "MyTable",
+                JavaScalaConversionUtil.toJava(TestData.smallData3()),
+                "a INT",
+                "b BIGINT",
+                "c STRING",
+                "proctime as PROCTIME()");
+        createTestValuesSinkTable("MySink", "a INT");
+        String jsonPlan =
+                tableEnv.getJsonPlan("insert into MySink SELECT a FROM MyTable 
order by proctime");
+        tableEnv.executeJsonPlan(jsonPlan).await();
+
+        assertResult(
+                Arrays.asList("+I[1]", "+I[2]", "+I[3]"),
+                TestValuesTableFactory.getResults("MySink"));
+    }
+
+    @Test
+    public void testSortRowTime() throws Exception {
+        createTestValuesSourceTable(
+                "MyTable",
+                JavaScalaConversionUtil.toJava(TestData.windowData()),
+                new String[] {
+                    "ts STRING",
+                    "`int` INT",
+                    "`double` DOUBLE",
+                    "`float` FLOAT",
+                    "`bigdec` DECIMAL(10, 2)",
+                    "`string` STRING",
+                    "`name` STRING",
+                    "`rowtime` AS TO_TIMESTAMP(`ts`)",
+                    "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' 
SECOND",
+                },
+                new HashMap<String, String>() {
+                    {
+                        put("enable-watermark-push-down", "true");
+                        put("failing-source", "true");
+                    }
+                });
+        createTestValuesSinkTable("MySink", "`int` INT");
+        String jsonPlan =
+                tableEnv.getJsonPlan(
+                        "insert into MySink SELECT `int` FROM MyTable order by 
rowtime, `double`");
+        tableEnv.executeJsonPlan(jsonPlan).await();
+
+        assertEquals(
+                Arrays.asList(
+                        "+I[1]", "+I[2]", "+I[2]", "+I[5]", "+I[6]", "+I[3]", 
"+I[3]", "+I[4]",
+                        "+I[7]", "+I[1]"),
+                TestValuesTableFactory.getResults("MySink"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
index 486e84c..a12c480 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
@@ -30,7 +31,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
@@ -50,12 +50,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 
 /** The base class for json plan testing. */
-public abstract class JsonPlanTestBase {
+public abstract class JsonPlanTestBase extends AbstractTestBase {
 
     @Rule public ExpectedException exception = ExpectedException.none();
 
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
     protected TableEnvironmentInternal tableEnv;
 
     @Before
@@ -166,7 +164,7 @@ public abstract class JsonPlanTestBase {
     protected void createTestCsvSourceTable(
             String tableName, List<String> data, String... fieldNameAndTypes) 
throws IOException {
         checkArgument(fieldNameAndTypes.length > 0);
-        File sourceFile = tmpFolder.newFile();
+        File sourceFile = TEMPORARY_FOLDER.newFile();
         Collections.shuffle(data);
         Files.write(sourceFile.toPath(), String.join("\n", data).getBytes());
         String ddl =
@@ -196,7 +194,7 @@ public abstract class JsonPlanTestBase {
                 StringUtils.isNullOrWhitespaceOnly(partitionFields)
                         ? ""
                         : "\n partitioned by (" + partitionFields + ") \n";
-        File sinkPath = tmpFolder.newFolder();
+        File sinkPath = TEMPORARY_FOLDER.newFolder();
         String ddl =
                 String.format(
                         "CREATE TABLE %s (\n"
@@ -218,7 +216,7 @@ public abstract class JsonPlanTestBase {
         assertResult(expected, actual);
     }
 
-    protected void assertResult(List<String> expected, List<String> actual) 
throws IOException {
+    protected void assertResult(List<String> expected, List<String> actual) {
         Collections.sort(expected);
         Collections.sort(actual);
         assertEquals(expected, actual);
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out
new file mode 100644
index 0000000..8b33082
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out
@@ -0,0 +1,386 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+    "scanTableSource" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MyTable"
+      },
+      "catalogTable" : {
+        "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND",
+        "schema.4.expr" : "PROCTIME()",
+        "schema.0.data-type" : "INT",
+        "schema.2.name" : "c",
+        "schema.1.name" : "b",
+        "schema.4.name" : "proctime",
+        "schema.1.data-type" : "BIGINT",
+        "schema.3.data-type" : "TIMESTAMP(3)",
+        "schema.2.data-type" : "VARCHAR(2147483647)",
+        "schema.3.name" : "rowtime",
+        "connector" : "values",
+        "schema.watermark.0.rowtime" : "rowtime",
+        "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)",
+        "schema.3.expr" : "TO_TIMESTAMP(`c`)",
+        "schema.4.data-type" : "TIMESTAMP(3) NOT NULL",
+        "schema.0.name" : "a"
+      },
+      "sourceAbilitySpecs" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : {
+          "type" : "ROW",
+          "nullable" : false,
+          "fields" : [ {
+            "a" : "INT"
+          }, {
+            "c" : "VARCHAR(2147483647)"
+          } ]
+        }
+      } ]
+    },
+    "id" : 1,
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c]]], fields=[a, c])",
+    "inputProperties" : [ ]
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "PROCTIME",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ ],
+      "type" : {
+        "timestampKind" : "PROCTIME",
+        "nullable" : false
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "VARCHAR",
+        "nullable" : true,
+        "precision" : 2147483647
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "TO_TIMESTAMP",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : {
+          "typeName" : "VARCHAR",
+          "nullable" : true,
+          "precision" : 2147483647
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    } ],
+    "condition" : null,
+    "id" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "proctime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, PROCTIME() AS proctime, c, 
TO_TIMESTAMP(c) AS rowtime])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
+    "watermarkExpr" : {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "-",
+        "kind" : "MINUS",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : {
+          "typeName" : "TIMESTAMP",
+          "nullable" : true,
+          "precision" : 3
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1000,
+        "type" : {
+          "typeName" : "INTERVAL_SECOND",
+          "nullable" : false,
+          "precision" : 2,
+          "scale" : 6
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    },
+    "rowtimeFieldIndex" : 3,
+    "id" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "proctime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
+    "id" : 4,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "proctime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort",
+    "orderBy" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      }, {
+        "index" : 2,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "id" : 5,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "proctime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TemporalSort(orderBy=[proctime ASC, c ASC])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 6,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      } ]
+    },
+    "description" : "Calc(select=[a])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+    "dynamicTableSink" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MySink"
+      },
+      "catalogTable" : {
+        "connector" : "values",
+        "schema.0.data-type" : "INT",
+        "schema.0.name" : "a"
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "id" : 7,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out
new file mode 100644
index 0000000..0fbc3a7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out
@@ -0,0 +1,346 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+    "scanTableSource" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MyTable"
+      },
+      "catalogTable" : {
+        "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND",
+        "schema.4.expr" : "PROCTIME()",
+        "schema.0.data-type" : "INT",
+        "schema.2.name" : "c",
+        "schema.1.name" : "b",
+        "schema.4.name" : "proctime",
+        "schema.1.data-type" : "BIGINT",
+        "schema.3.data-type" : "TIMESTAMP(3)",
+        "schema.2.data-type" : "VARCHAR(2147483647)",
+        "schema.3.name" : "rowtime",
+        "connector" : "values",
+        "schema.watermark.0.rowtime" : "rowtime",
+        "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)",
+        "schema.3.expr" : "TO_TIMESTAMP(`c`)",
+        "schema.4.data-type" : "TIMESTAMP(3) NOT NULL",
+        "schema.0.name" : "a"
+      },
+      "sourceAbilitySpecs" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : {
+          "type" : "ROW",
+          "nullable" : false,
+          "fields" : [ {
+            "a" : "INT"
+          }, {
+            "c" : "VARCHAR(2147483647)"
+          } ]
+        }
+      } ]
+    },
+    "id" : 8,
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c]]], fields=[a, c])",
+    "inputProperties" : [ ]
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "TO_TIMESTAMP",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : {
+          "typeName" : "VARCHAR",
+          "nullable" : true,
+          "precision" : 2147483647
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "VARCHAR",
+        "nullable" : true,
+        "precision" : 2147483647
+      }
+    } ],
+    "condition" : null,
+    "id" : 9,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Calc(select=[a, TO_TIMESTAMP(c) AS rowtime, c])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner",
+    "watermarkExpr" : {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "-",
+        "kind" : "MINUS",
+        "syntax" : "SPECIAL"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : {
+          "typeName" : "TIMESTAMP",
+          "nullable" : true,
+          "precision" : 3
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1000,
+        "type" : {
+          "typeName" : "INTERVAL_SECOND",
+          "nullable" : false,
+          "precision" : 2,
+          "scale" : 6
+        }
+      } ],
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    },
+    "rowtimeFieldIndex" : 1,
+    "id" : 10,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime 
- 1000:INTERVAL SECOND)])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
+    "id" : 11,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort",
+    "orderBy" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      }, {
+        "index" : 2,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "id" : 12,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "rowtime" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "c" : "VARCHAR(2147483647)"
+      } ]
+    },
+    "description" : "TemporalSort(orderBy=[rowtime ASC, c ASC])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 13,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      } ]
+    },
+    "description" : "Calc(select=[a])"
+  }, {
+    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+    "dynamicTableSink" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MySink"
+      },
+      "catalogTable" : {
+        "connector" : "values",
+        "schema.0.data-type" : "INT",
+        "schema.0.name" : "a"
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "id" : 14,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to