This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5ff359b579b44b4050111f2cee58c1debbb5dcf1 Author: godfreyhe <[email protected]> AuthorDate: Fri Mar 19 18:43:08 2021 +0800 [FLINK-21869][table-planner-blink] Support StreamExecTemporalSort json serialization/deserialization This closes #15284 --- .../nodes/exec/stream/StreamExecTemporalSort.java | 30 +- .../nodes/exec/stream/JsonSerdeCoverageTest.java | 1 - .../exec/stream/TemporalSortJsonPlanTest.java | 68 ++++ .../stream/jsonplan/TemporalSortJsonITCase.java | 89 +++++ .../table/planner/utils/JsonPlanTestBase.java | 12 +- .../testSortProcessingTime.out | 386 +++++++++++++++++++++ .../testSortRowTime.out | 346 ++++++++++++++++++ 7 files changed, 922 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java index fc5b90e..4380875 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java @@ -40,12 +40,22 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampKind; import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** {@link StreamExecNode} for time-ascending-order Sort without `limit`. */ public class StreamExecTemporalSort extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> { + public static final String FIELD_NAME_SORT_SPEC = "orderBy"; + + @JsonProperty(FIELD_NAME_SORT_SPEC) private final SortSpec sortSpec; public StreamExecTemporalSort( @@ -53,8 +63,24 @@ public class StreamExecTemporalSort extends ExecNodeBase<RowData> InputProperty inputProperty, RowType outputType, String description) { - super(Collections.singletonList(inputProperty), outputType, description); - this.sortSpec = sortSpec; + this( + sortSpec, + getNewNodeId(), + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecTemporalSort( + @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec, + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super(id, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.sortSpec = checkNotNull(sortSpec); } @SuppressWarnings("unchecked") diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java index 5879c2ce..f737719 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java @@ -51,7 +51,6 @@ public class JsonSerdeCoverageTest { "StreamExecPythonOverAggregate", "StreamExecPythonCorrelate", "StreamExecPythonCalc", - "StreamExecTemporalSort", "StreamExecSort", "StreamExecMultipleInput", "StreamExecValues"); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java new file mode 100644 index 0000000..3ba476d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; + +/** Test json serialization/deserialization for temporal sort. */ +public class TemporalSortJsonPlanTest extends TableTestBase { + + private StreamTableTestUtil util; + + @Before + public void setup() { + util = streamTestUtil(TableConfig.getDefault()); + TableEnvironment tEnv = util.getTableEnv(); + + String srcTableDdl = + "CREATE TABLE MyTable (\n" + + " a INT,\n" + + " b BIGINT,\n" + + " c VARCHAR,\n" + + " `rowtime` AS TO_TIMESTAMP(c),\n" + + " proctime as PROCTIME(),\n" + + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(srcTableDdl); + + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a INT\n" + + ") WITH (\n" + + " 'connector' = 'values')\n"; + tEnv.executeSql(sinkTableDdl); + } + + @Test + public void testSortProcessingTime() { + util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by proctime, c"); + } + + @Test + public void testSortRowTime() { + util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by rowtime, c"); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java new file mode 100644 index 0000000..0e117c8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.jsonplan; + +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.JsonPlanTestBase; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +/** Test for temporal sort json plan. */ +public class TemporalSortJsonITCase extends JsonPlanTestBase { + + @Test + public void testSortProcessingTime() throws Exception { + createTestValuesSourceTable( + "MyTable", + JavaScalaConversionUtil.toJava(TestData.smallData3()), + "a INT", + "b BIGINT", + "c STRING", + "proctime as PROCTIME()"); + createTestValuesSinkTable("MySink", "a INT"); + String jsonPlan = + tableEnv.getJsonPlan("insert into MySink SELECT a FROM MyTable order by proctime"); + tableEnv.executeJsonPlan(jsonPlan).await(); + + assertResult( + Arrays.asList("+I[1]", "+I[2]", "+I[3]"), + TestValuesTableFactory.getResults("MySink")); + } + + @Test + public void testSortRowTime() throws Exception { + createTestValuesSourceTable( + "MyTable", + JavaScalaConversionUtil.toJava(TestData.windowData()), + new String[] { + "ts STRING", + "`int` INT", + "`double` DOUBLE", + "`float` FLOAT", + "`bigdec` DECIMAL(10, 2)", + "`string` STRING", + "`name` STRING", + "`rowtime` AS TO_TIMESTAMP(`ts`)", + "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND", + }, + new HashMap<String, String>() { + { + put("enable-watermark-push-down", "true"); + put("failing-source", "true"); + } + }); + createTestValuesSinkTable("MySink", "`int` INT"); + String jsonPlan = + tableEnv.getJsonPlan( + "insert into MySink SELECT `int` FROM MyTable order by rowtime, `double`"); + tableEnv.executeJsonPlan(jsonPlan).await(); + + assertEquals( + Arrays.asList( + "+I[1]", "+I[2]", "+I[2]", "+I[5]", "+I[6]", "+I[3]", "+I[3]", "+I[4]", + "+I[7]", "+I[1]"), + TestValuesTableFactory.getResults("MySink")); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java index 486e84c..a12c480 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; import org.apache.flink.util.StringUtils; @@ -30,7 +31,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; import javax.annotation.Nullable; @@ -50,12 +50,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; /** The base class for json plan testing. */ -public abstract class JsonPlanTestBase { +public abstract class JsonPlanTestBase extends AbstractTestBase { @Rule public ExpectedException exception = ExpectedException.none(); - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - protected TableEnvironmentInternal tableEnv; @Before @@ -166,7 +164,7 @@ public abstract class JsonPlanTestBase { protected void createTestCsvSourceTable( String tableName, List<String> data, String... fieldNameAndTypes) throws IOException { checkArgument(fieldNameAndTypes.length > 0); - File sourceFile = tmpFolder.newFile(); + File sourceFile = TEMPORARY_FOLDER.newFile(); Collections.shuffle(data); Files.write(sourceFile.toPath(), String.join("\n", data).getBytes()); String ddl = @@ -196,7 +194,7 @@ public abstract class JsonPlanTestBase { StringUtils.isNullOrWhitespaceOnly(partitionFields) ? "" : "\n partitioned by (" + partitionFields + ") \n"; - File sinkPath = tmpFolder.newFolder(); + File sinkPath = TEMPORARY_FOLDER.newFolder(); String ddl = String.format( "CREATE TABLE %s (\n" @@ -218,7 +216,7 @@ public abstract class JsonPlanTestBase { assertResult(expected, actual); } - protected void assertResult(List<String> expected, List<String> actual) throws IOException { + protected void assertResult(List<String> expected, List<String> actual) { Collections.sort(expected); Collections.sort(actual); assertEquals(expected, actual); diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out new file mode 100644 index 0000000..8b33082 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out @@ -0,0 +1,386 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + } + } ] + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]], fields=[a, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "Calc(select=[a, PROCTIME() AS proctime, c, TO_TIMESTAMP(c) AS rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort", + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + }, { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "proctime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TemporalSort(orderBy=[proctime ASC, c ASC])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : null, + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + } ] + }, + "description" : "Calc(select=[a])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.0.name" : "a" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out new file mode 100644 index 0000000..0fbc3a7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out @@ -0,0 +1,346 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) NOT NULL", + "schema.0.name" : "a" + }, + "sourceAbilitySpecs" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ] ], + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + } + } ] + }, + "id" : 8, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]], fields=[a, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "condition" : null, + "id" : 9, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Calc(select=[a, TO_TIMESTAMP(c) AS rowtime, c])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 1, + "id" : 10, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 11, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "Exchange(distribution=[single])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort", + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + }, { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "id" : 12, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TemporalSort(orderBy=[rowtime ASC, c ASC])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "condition" : null, + "id" : 13, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + } ] + }, + "description" : "Calc(select=[a])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "connector" : "values", + "schema.0.data-type" : "INT", + "schema.0.name" : "a" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 14, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a])" + } ], + "edges" : [ { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file
