This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f751a00fd6f [FLINK-33758] Implement restore tests for TemporalSort node (#23879) f751a00fd6f is described below commit f751a00fd6f0e70187d2a9ae2ccd6a728d9a2c64 Author: James Hughes <jhug...@confluent.io> AuthorDate: Thu Dec 7 07:51:40 2023 -0500 [FLINK-33758] Implement restore tests for TemporalSort node (#23879) --- .../exec/stream/TemporalSortJsonPlanTest.java | 68 -------- .../nodes/exec/stream/TemporalSortRestoreTest.java | 40 +++++ .../exec/stream/TemporalSortTestPrograms.java | 96 +++++++++++ .../stream/jsonplan/TemporalSortJsonITCase.java | 87 ---------- .../plan/temporal-sort-proctime.json} | 180 +++------------------ .../temporal-sort-proctime/savepoint/_metadata | Bin 0 -> 6037 bytes .../plan/temporal-sort-rowtime.json} | 144 +++++++---------- .../temporal-sort-rowtime/savepoint/_metadata | Bin 0 -> 10729 bytes 8 files changed, 210 insertions(+), 405 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java deleted file mode 100644 index 8ebd6a47e59..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for temporal sort. */ -class TemporalSortJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - TableEnvironment tEnv = util.getTableEnv(); - - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a INT,\n" - + " b BIGINT,\n" - + " c VARCHAR,\n" - + " `rowtime` AS TO_TIMESTAMP(c),\n" - + " proctime as PROCTIME(),\n" - + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(srcTableDdl); - - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - } - - @Test - void testSortProcessingTime() { - util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by proctime, c"); - } - - @Test - void testSortRowTime() { - util.verifyJsonPlan("insert into MySink SELECT a FROM MyTable order by rowtime, c"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java new file mode 100644 index 00000000000..f6a1b0fcea9 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortRestoreTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecTemporalSort}. */ +public class TemporalSortRestoreTest extends RestoreTestBase { + + public TemporalSortRestoreTest() { + super(StreamExecTemporalSort.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + TemporalSortTestPrograms.TEMPORAL_SORT_PROCTIME, + TemporalSortTestPrograms.TEMPORAL_SORT_ROWTIME); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java new file mode 100644 index 00000000000..c340435c7a2 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecTemporalSort}. */ +public class TemporalSortTestPrograms { + + static final Row[] BEFORE_DATA = { + Row.of("2020-10-10 00:00:01", 1, 1d), + Row.of("2020-10-10 00:00:02", 2, 2d), + Row.of("2020-10-10 00:00:07", 5, 6d), + Row.of("2020-10-10 00:00:07", 3, 3d), + // out of order + Row.of("2020-10-10 00:00:06", 6, 6d), + Row.of("2020-10-10 00:00:08", 3, null), + // late event + Row.of("2020-10-10 00:00:04", 5, 5d), + Row.of("2020-10-10 00:00:16", 4, 4d) + }; + + static final Row[] AFTER_DATA = { + Row.of("2020-10-10 00:00:40", 10, 3d), Row.of("2020-10-10 00:00:42", 11, 4d) + }; + static final TableTestProgram TEMPORAL_SORT_PROCTIME = + TableTestProgram.of( + "temporal-sort-proctime", "validates temporal sort node with proctime") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema( + "a INT", + "b BIGINT", + "c STRING", + "`proctime` as PROCTIME()") + .producedBeforeRestore( + Row.of(1, 1L, "Hi"), + Row.of(2, 2L, "Hello"), + Row.of(3, 2L, "Hello world")) + .producedAfterRestore( + Row.of(4, 1L, "Guten Morgen"), + Row.of(5, 2L, "Guten Tag")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT") + .consumedBeforeRestore("+I[1]", "+I[2]", "+I[3]") + .consumedAfterRestore("+I[4]", "+I[5]") + .build()) + .runSql("INSERT INTO sink_t SELECT a from source_t ORDER BY proctime") + .build(); + + static final TableTestProgram TEMPORAL_SORT_ROWTIME = + TableTestProgram.of( + "temporal-sort-rowtime", "validates temporal sort node with rowtime") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema( + "ts STRING", + "`int` INT", + "`double` DOUBLE", + "`rowtime` AS TO_TIMESTAMP(`ts`)", + "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND") + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT") + .consumedBeforeRestore( + "+I[1]", "+I[2]", "+I[6]", "+I[3]", "+I[5]", "+I[3]") + .consumedAfterRestore("+I[4]", "+I[10]", "+I[11]") + .build()) + .runSql( + "insert into sink_t SELECT `int` FROM source_t order by rowtime, `double`") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java deleted file mode 100644 index fe623816f1a..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalSortJsonITCase.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.HashMap; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for temporal sort json plan. */ -class TemporalSortJsonITCase extends JsonPlanTestBase { - - @Test - void testSortProcessingTime() throws Exception { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.smallData3()), - "a INT", - "b BIGINT", - "c STRING", - "proctime as PROCTIME()"); - createTestValuesSinkTable("MySink", "a INT"); - compileSqlAndExecutePlan("insert into MySink SELECT a FROM MyTable order by proctime") - .await(); - - assertResult( - Arrays.asList("+I[1]", "+I[2]", "+I[3]"), - TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testSortRowTime() throws Exception { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()), - new String[] { - "ts STRING", - "`int` INT", - "`double` DOUBLE", - "`float` FLOAT", - "`bigdec` DECIMAL(10, 2)", - "`string` STRING", - "`name` STRING", - "`rowtime` AS TO_TIMESTAMP(`ts`)", - "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND", - }, - new HashMap<String, String>() { - { - put("enable-watermark-push-down", "true"); - put("failing-source", "true"); - } - }); - createTestValuesSinkTable("MySink", "`int` INT"); - compileSqlAndExecutePlan( - "insert into MySink SELECT `int` FROM MyTable order by rowtime, `double`") - .await(); - - assertThat(TestValuesTableFactory.getResultsAsStrings("MySink")) - .isEqualTo( - Arrays.asList( - "+I[1]", "+I[2]", "+I[2]", "+I[5]", "+I[6]", "+I[3]", "+I[3]", - "+I[4]", "+I[7]", "+I[1]")); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/plan/temporal-sort-proctime.json similarity index 53% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/plan/temporal-sort-proctime.json index bb931b580c5..733ce6266f7 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortProcessingTime.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/plan/temporal-sort-proctime.json @@ -1,11 +1,11 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { @@ -17,22 +17,6 @@ }, { "name" : "c", "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "TO_TIMESTAMP(`c`)" - } }, { "name" : "proctime", "kind" : "COMPUTED", @@ -51,46 +35,23 @@ "serializableString" : "PROCTIME()" } } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime` - INTERVAL '1' SECOND" - } - } ] + "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } + "partitionKeys" : [ ] } }, "abilities" : [ { "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 2 ] ], - "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL" + "projectedFields" : [ [ 0 ] ], + "producedType" : "ROW<`a` INT> NOT NULL" }, { "type" : "ReadingMetadata", "metadataKeys" : [ ], - "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL" + "producedType" : "ROW<`a` INT> NOT NULL" } ] }, - "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])", + "outputType" : "ROW<`a` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[a], metadata=[]]], fields=[a])", "inputProperties" : [ ] }, { "id" : 2, @@ -109,19 +70,6 @@ "precision" : 3, "kind" : "PROCTIME" } - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" } ], "condition" : null, "inputProperties" : [ { @@ -144,69 +92,11 @@ "precision" : 3, "kind" : "PROCTIME" } - }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : "TIMESTAMP(3)" } ] }, - "description" : "Calc(select=[a, PROCTIME() AS proctime, c, TO_TIMESTAMP(c) AS rowtime])" + "description" : "Calc(select=[a, PROCTIME() AS proctime])" }, { "id" : 3, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 3, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" - }, { - "id" : 4, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -228,31 +118,17 @@ "precision" : 3, "kind" : "PROCTIME" } - }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } } ] }, "description" : "Exchange(distribution=[single])" }, { - "id" : 5, + "id" : 4, "type" : "stream-exec-temporal-sort_1", "orderBy" : { "fields" : [ { "index" : 1, "isAscending" : true, "nullIsLast" : false - }, { - "index" : 2, - "isAscending" : true, - "nullIsLast" : false } ] }, "inputProperties" : [ { @@ -275,21 +151,11 @@ "precision" : 3, "kind" : "PROCTIME" } - }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } } ] }, - "description" : "TemporalSort(orderBy=[proctime ASC, c ASC])" + "description" : "TemporalSort(orderBy=[proctime ASC])" }, { - "id" : 6, + "id" : 5, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -307,7 +173,7 @@ "outputType" : "ROW<`a` INT>", "description" : "Calc(select=[a])" }, { - "id" : 7, + "id" : 6, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -318,7 +184,7 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { @@ -327,10 +193,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } + "partitionKeys" : [ ] } } }, @@ -343,7 +206,7 @@ "priority" : 0 } ], "outputType" : "ROW<`a` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a])" + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a])" } ], "edges" : [ { "source" : 1, @@ -380,12 +243,5 @@ "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/savepoint/_metadata new file mode 100644 index 00000000000..4bfe20a0264 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-proctime/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/plan/temporal-sort-rowtime.json similarity index 70% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/plan/temporal-sort-rowtime.json index 8db2bd2bfe3..345f9904a82 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortJsonPlanTest_jsonplan/testSortRowTime.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/plan/temporal-sort-rowtime.json @@ -1,22 +1,22 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 7, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`source_t`", "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "a", - "dataType" : "INT" + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" }, { - "name" : "b", - "dataType" : "BIGINT" + "name" : "int", + "dataType" : "INT" }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" + "name" : "double", + "dataType" : "DOUBLE" }, { "name" : "rowtime", "kind" : "COMPUTED", @@ -26,29 +26,12 @@ "internalName" : "$TO_TIMESTAMP$1", "operands" : [ { "kind" : "INPUT_REF", - "inputIndex" : 2, + "inputIndex" : 0, "type" : "VARCHAR(2147483647)" } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" - } - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" + "serializableString" : "TO_TIMESTAMP(`ts`)" } } ], "watermarkSpecs" : [ { @@ -73,45 +56,33 @@ } } ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } + "partitionKeys" : [ ] } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 2 ] ], - "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL" - } ] + } }, - "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])", + "outputType" : "ROW<`ts` VARCHAR(2147483647), `int` INT, `double` DOUBLE>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[ts, int, double])", "inputProperties" : [ ] }, { - "id" : 2, + "id" : 8, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", - "inputIndex" : 0, + "inputIndex" : 1, "type" : "INT" }, { "kind" : "CALL", "internalName" : "$TO_TIMESTAMP$1", "operands" : [ { "kind" : "INPUT_REF", - "inputIndex" : 1, + "inputIndex" : 0, "type" : "VARCHAR(2147483647)" } ], "type" : "TIMESTAMP(3)" }, { "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" + "inputIndex" : 2, + "type" : "DOUBLE" } ], "condition" : null, "inputProperties" : [ { @@ -121,10 +92,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` INT, `rowtime` TIMESTAMP(3), `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[a, TO_TIMESTAMP(c) AS rowtime, c])" + "outputType" : "ROW<`int` INT, `rowtime` TIMESTAMP(3), `double` DOUBLE>", + "description" : "Calc(select=[int, TO_TIMESTAMP(ts) AS rowtime, double])" }, { - "id" : 3, + "id" : 9, "type" : "stream-exec-watermark-assigner_1", "watermarkExpr" : { "kind" : "CALL", @@ -152,7 +123,7 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "a", + "name" : "int", "fieldType" : "INT" }, { "name" : "rowtime", @@ -162,13 +133,13 @@ "kind" : "ROWTIME" } }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" + "name" : "double", + "fieldType" : "DOUBLE" } ] }, "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" }, { - "id" : 4, + "id" : 10, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -180,7 +151,7 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "a", + "name" : "int", "fieldType" : "INT" }, { "name" : "rowtime", @@ -190,13 +161,13 @@ "kind" : "ROWTIME" } }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" + "name" : "double", + "fieldType" : "DOUBLE" } ] }, "description" : "Exchange(distribution=[single])" }, { - "id" : 5, + "id" : 11, "type" : "stream-exec-temporal-sort_1", "orderBy" : { "fields" : [ { @@ -219,7 +190,7 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "a", + "name" : "int", "fieldType" : "INT" }, { "name" : "rowtime", @@ -229,13 +200,13 @@ "kind" : "ROWTIME" } }, { - "name" : "c", - "fieldType" : "VARCHAR(2147483647)" + "name" : "double", + "fieldType" : "DOUBLE" } ] }, - "description" : "TemporalSort(orderBy=[rowtime ASC, c ASC])" + "description" : "TemporalSort(orderBy=[rowtime ASC, double ASC])" }, { - "id" : 6, + "id" : 12, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -250,10 +221,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` INT>", - "description" : "Calc(select=[a])" + "outputType" : "ROW<`int` INT>", + "description" : "Calc(select=[int])" }, { - "id" : 7, + "id" : 13, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -264,7 +235,7 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { @@ -273,10 +244,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } + "partitionKeys" : [ ] } } }, @@ -288,50 +256,50 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a])" + "outputType" : "ROW<`int` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[int])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 7, + "target" : 8, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 8, + "target" : 9, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 9, + "target" : 10, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 10, + "target" : 11, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 5, - "target" : 6, + "source" : 11, + "target" : 12, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 6, - "target" : 7, + "source" : 12, + "target" : 13, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/savepoint/_metadata new file mode 100644 index 00000000000..9dd97efe2dd Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-sort_1/temporal-sort-rowtime/savepoint/_metadata differ