This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32d31cb8b838451182d1949414b73ed585b13336 Author: bvarghese1 <[email protected]> AuthorDate: Fri Nov 3 11:03:06 2023 -0700 [FLINK-33455] Implement restore tests for SortLimit node This closes #23660 --- .../nodes/exec/stream/SortLimitRestoreTest.java | 38 +++++ .../plan/nodes/exec/stream/SortTestPrograms.java | 126 ++++++++++++++ .../sort-limit-asc/plan/sort-limit-asc.json | 186 +++++++++++++++++++++ .../sort-limit-asc/savepoint/_metadata | Bin 0 -> 12105 bytes .../sort-limit-desc/plan/sort-limit-desc.json | 186 +++++++++++++++++++++ .../sort-limit-desc/savepoint/_metadata | Bin 0 -> 12105 bytes 6 files changed, 536 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java new file mode 100644 index 00000000000..58e53435318 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecSortLimit}. */ +public class SortLimitRestoreTest extends RestoreTestBase { + + public SortLimitRestoreTest() { + super(StreamExecSortLimit.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList(SortTestPrograms.SORT_LIMIT_ASC, SortTestPrograms.SORT_LIMIT_DESC); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java new file mode 100644 index 00000000000..0a6f68d4e76 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** + * {@link TableTestProgram} definitions for testing {@link + * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit}. + */ +public class SortTestPrograms { + + static final Row[] DATA = { + Row.of(2, "a", 6), + Row.of(4, "b", 8), + Row.of(6, "c", 10), + Row.of(1, "a", 5), + Row.of(3, "b", 7), + Row.of(5, "c", 9) + }; + + static final TableTestProgram SORT_LIMIT_ASC = + TableTestProgram.of( + "sort-limit-asc", + "validates sort limit node by sorting integers in asc mode") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("a INT", "b VARCHAR", "c INT") + .producedBeforeRestore(DATA) + .producedAfterRestore( + // replaces (3, b, 7) from beforeRestore + Row.of(2, "a", 6), + // ignored since greater than (2, a, 6) + Row.of(4, "b", 8), + // replaces (2, a, 6) from beforeRestore + Row.of(1, "a", 5)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT", "b VARCHAR", "c BIGINT") + // Sort maintains a max heap of min elements so the final state + // after producing test data of the heap + // is shown below with every insertion and deletion shown in the + // consumedBeforeRestore + // [3, b, 7] + // [2, a, 6] [1, a, 5] + .consumedBeforeRestore( + "+I[2, a, 6]", + "+I[4, b, 8]", + "+I[6, c, 10]", + "-D[6, c, 10]", + "+I[1, a, 5]", + "-D[4, b, 8]", + "+I[3, b, 7]") + // Since the same data is replayed after restore the heap state + // is restored and updated. + // The final state of the heap is shown below with every + // insertion and deletion shown in the consumedAfterRestore + // [2, a, 6] + // [1, a, 5] [1, a, 5] + .consumedAfterRestore( + "-D[3, b, 7]", + "+I[2, a, 6]", + "-D[2, a, 6]", + "+I[1, a, 5]") + .build()) + .runSql("INSERT INTO sink_t SELECT * from source_t ORDER BY a LIMIT 3") + .build(); + + static final TableTestProgram SORT_LIMIT_DESC = + TableTestProgram.of( + "sort-limit-desc", + "validates sort limit node by sorting integers in desc mode") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("a INT", "b VARCHAR", "c INT") + .producedBeforeRestore(DATA) + .producedAfterRestore( + // ignored since smaller than the least max (4, b, 8) + Row.of(2, "a", 6), + // replaces (4, b, 8) from beforeRestore + Row.of(6, "c", 10), + // ignored since not larger than the least max (5, c, 9) + Row.of(5, "c", 9)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT", "b VARCHAR", "c BIGINT") + // heap state + // [4, b, 8] + // [5, c, 9] [6, c, 10] + .consumedBeforeRestore( + "+I[2, a, 6]", + "+I[4, b, 8]", + "+I[6, c, 10]", + "-D[2, a, 6]", + "+I[3, b, 7]", + "-D[3, b, 7]", + "+I[5, c, 9]") + // heap state + // [5, c, 9] + // [6, c, 10] [6, c, 10] + .consumedAfterRestore("-D[4, b, 8]", "+I[6, c, 10]") + .build()) + .runSql("INSERT INTO sink_t SELECT * from source_t ORDER BY a DESC LIMIT 3") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json new file mode 100644 index 00000000000..7afa8612671 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json @@ -0,0 +1,186 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "Exchange(distribution=[single])" + }, { + "id" : 3, + "type" : "stream-exec-sort-limit_1", + "configuration" : { + "table.exec.rank.topn-cache-size" : "10000" + }, + "orderBy" : { + "fields" : [ { + "index" : 0, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "rankStrategy" : { + "type" : "AppendFast" + }, + "generateUpdateBefore" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "rankState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "SortLimit(orderBy=[a ASC], offset=[0], fetch=[3], strategy=[AppendFastStrategy])", + "rankType" : "ROW_NUMBER", + "partition" : { + "fields" : [ ] + }, + "outputRowNumber" : false + }, { + "id" : 4, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>", + "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])" + }, { + "id" : 5, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/savepoint/_metadata new file mode 100644 index 00000000000..9af9b25c793 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json new file mode 100644 index 00000000000..2ed21e49348 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json @@ -0,0 +1,186 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "Exchange(distribution=[single])" + }, { + "id" : 3, + "type" : "stream-exec-sort-limit_1", + "configuration" : { + "table.exec.rank.topn-cache-size" : "10000" + }, + "orderBy" : { + "fields" : [ { + "index" : 0, + "isAscending" : false, + "nullIsLast" : true + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 3 + }, + "rankStrategy" : { + "type" : "AppendFast" + }, + "generateUpdateBefore" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "rankState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "SortLimit(orderBy=[a DESC], offset=[0], fetch=[3], strategy=[AppendFastStrategy])", + "rankType" : "ROW_NUMBER", + "partition" : { + "fields" : [ ] + }, + "outputRowNumber" : false + }, { + "id" : 4, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>", + "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])" + }, { + "id" : 5, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/savepoint/_metadata new file mode 100644 index 00000000000..dc2e54fe8f9 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/savepoint/_metadata differ
