This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32d31cb8b838451182d1949414b73ed585b13336
Author: bvarghese1 <[email protected]>
AuthorDate: Fri Nov 3 11:03:06 2023 -0700

    [FLINK-33455] Implement restore tests for SortLimit node
    
    This closes #23660
---
 .../nodes/exec/stream/SortLimitRestoreTest.java    |  38 +++++
 .../plan/nodes/exec/stream/SortTestPrograms.java   | 126 ++++++++++++++
 .../sort-limit-asc/plan/sort-limit-asc.json        | 186 +++++++++++++++++++++
 .../sort-limit-asc/savepoint/_metadata             | Bin 0 -> 12105 bytes
 .../sort-limit-desc/plan/sort-limit-desc.json      | 186 +++++++++++++++++++++
 .../sort-limit-desc/savepoint/_metadata            | Bin 0 -> 12105 bytes
 6 files changed, 536 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
new file mode 100644
index 00000000000..58e53435318
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecSortLimit}. */
+public class SortLimitRestoreTest extends RestoreTestBase {
+
+    public SortLimitRestoreTest() {
+        super(StreamExecSortLimit.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(SortTestPrograms.SORT_LIMIT_ASC, 
SortTestPrograms.SORT_LIMIT_DESC);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
new file mode 100644
index 00000000000..0a6f68d4e76
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/**
+ * {@link TableTestProgram} definitions for testing {@link
+ * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit}.
+ */
+public class SortTestPrograms {
+
+    static final Row[] DATA = {
+        Row.of(2, "a", 6),
+        Row.of(4, "b", 8),
+        Row.of(6, "c", 10),
+        Row.of(1, "a", 5),
+        Row.of(3, "b", 7),
+        Row.of(5, "c", 9)
+    };
+
+    static final TableTestProgram SORT_LIMIT_ASC =
+            TableTestProgram.of(
+                            "sort-limit-asc",
+                            "validates sort limit node by sorting integers in 
asc mode")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT", "b VARCHAR", "c INT")
+                                    .producedBeforeRestore(DATA)
+                                    .producedAfterRestore(
+                                            // replaces (3, b, 7) from 
beforeRestore
+                                            Row.of(2, "a", 6),
+                                            // ignored since greater than (2, 
a, 6)
+                                            Row.of(4, "b", 8),
+                                            // replaces (2, a, 6) from 
beforeRestore
+                                            Row.of(1, "a", 5))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "b VARCHAR", "c 
BIGINT")
+                                    // Sort maintains a max heap of min 
elements so the final state
+                                    // after producing test data of the heap
+                                    // is shown below with every insertion and 
deletion shown in the
+                                    // consumedBeforeRestore
+                                    //      [3, b, 7]
+                                    // [2, a, 6]  [1, a, 5]
+                                    .consumedBeforeRestore(
+                                            "+I[2, a, 6]",
+                                            "+I[4, b, 8]",
+                                            "+I[6, c, 10]",
+                                            "-D[6, c, 10]",
+                                            "+I[1, a, 5]",
+                                            "-D[4, b, 8]",
+                                            "+I[3, b, 7]")
+                                    // Since the same data is replayed after 
restore the heap state
+                                    // is restored and updated.
+                                    // The final state of the heap is shown 
below with every
+                                    // insertion and deletion shown in the 
consumedAfterRestore
+                                    //      [2, a, 6]
+                                    // [1, a, 5]  [1, a, 5]
+                                    .consumedAfterRestore(
+                                            "-D[3, b, 7]",
+                                            "+I[2, a, 6]",
+                                            "-D[2, a, 6]",
+                                            "+I[1, a, 5]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a LIMIT 3")
+                    .build();
+
+    static final TableTestProgram SORT_LIMIT_DESC =
+            TableTestProgram.of(
+                            "sort-limit-desc",
+                            "validates sort limit node by sorting integers in 
desc mode")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT", "b VARCHAR", "c INT")
+                                    .producedBeforeRestore(DATA)
+                                    .producedAfterRestore(
+                                            // ignored since smaller than the 
least max (4, b, 8)
+                                            Row.of(2, "a", 6),
+                                            // replaces (4, b, 8) from 
beforeRestore
+                                            Row.of(6, "c", 10),
+                                            // ignored since not larger than 
the least max (5, c, 9)
+                                            Row.of(5, "c", 9))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "b VARCHAR", "c 
BIGINT")
+                                    // heap state
+                                    //      [4, b, 8]
+                                    // [5, c, 9]  [6, c, 10]
+                                    .consumedBeforeRestore(
+                                            "+I[2, a, 6]",
+                                            "+I[4, b, 8]",
+                                            "+I[6, c, 10]",
+                                            "-D[2, a, 6]",
+                                            "+I[3, b, 7]",
+                                            "-D[3, b, 7]",
+                                            "+I[5, c, 9]")
+                                    // heap state
+                                    //       [5, c, 9]
+                                    // [6, c, 10]  [6, c, 10]
+                                    .consumedAfterRestore("-D[4, b, 8]", 
"+I[6, c, 10]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a DESC LIMIT 3")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
new file mode 100644
index 00000000000..7afa8612671
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
@@ -0,0 +1,186 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sort-limit_1",
+    "configuration" : {
+      "table.exec.rank.topn-cache-size" : "10000"
+    },
+    "orderBy" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "rankRange" : {
+      "type" : "Constant",
+      "start" : 1,
+      "end" : 3
+    },
+    "rankStrategy" : {
+      "type" : "AppendFast"
+    },
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "rankState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "SortLimit(orderBy=[a ASC], offset=[0], fetch=[3], 
strategy=[AppendFastStrategy])",
+    "rankType" : "ROW_NUMBER",
+    "partition" : {
+      "fields" : [ ]
+    },
+    "outputRowNumber" : false
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/savepoint/_metadata
new file mode 100644
index 00000000000..9af9b25c793
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-asc/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
new file mode 100644
index 00000000000..2ed21e49348
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
@@ -0,0 +1,186 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sort-limit_1",
+    "configuration" : {
+      "table.exec.rank.topn-cache-size" : "10000"
+    },
+    "orderBy" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : false,
+        "nullIsLast" : true
+      } ]
+    },
+    "rankRange" : {
+      "type" : "Constant",
+      "start" : 1,
+      "end" : 3
+    },
+    "rankStrategy" : {
+      "type" : "AppendFast"
+    },
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "rankState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "SortLimit(orderBy=[a DESC], offset=[0], fetch=[3], 
strategy=[AppendFastStrategy])",
+    "rankType" : "ROW_NUMBER",
+    "partition" : {
+      "fields" : [ ]
+    },
+    "outputRowNumber" : false
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/savepoint/_metadata
new file mode 100644
index 00000000000..dc2e54fe8f9
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sort-limit_1/sort-limit-desc/savepoint/_metadata
 differ

Reply via email to