This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7542b56f2ab [FLINK-33469] Implement restore tests for Limit node 
(#23675)
7542b56f2ab is described below

commit 7542b56f2abb860f42a83c4687f6e38bb82b78c6
Author: James Hughes <[email protected]>
AuthorDate: Thu Nov 9 10:49:15 2023 -0500

    [FLINK-33469] Implement restore tests for Limit node (#23675)
---
 .../plan/nodes/exec/stream/LimitJsonPlanTest.java  |  66 -----------------
 .../plan/nodes/exec/stream/LimitRestoreTest.java   |  38 ++++++++++
 .../plan/nodes/exec/stream/LimitTestPrograms.java  |  60 ++++++++++++++++
 .../stream/jsonplan/LimitJsonPlanITCase.java       |  50 -------------
 .../stream-exec-limit_1/limit/plan/limit.json}     |  79 ++++++++++-----------
 .../stream-exec-limit_1/limit/savepoint/_metadata  | Bin 0 -> 11422 bytes
 6 files changed, 134 insertions(+), 159 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java
deleted file mode 100644
index 062edcaff15..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-
-/** Test json serialization for sort limit. */
-public class LimitJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @Before
-    public void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int not null,\n"
-                        + "  c varchar,\n"
-                        + "  d timestamp(3)\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-    }
-
-    @Test
-    public void testLimit() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  b bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        String sql = "insert into MySink SELECT a, a from MyTable limit 10";
-        util.verifyJsonPlan(sql);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
new file mode 100644
index 00000000000..9d6180064ff
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecLimit}. */
+public class LimitRestoreTest extends RestoreTestBase {
+
+    public LimitRestoreTest() {
+        super(StreamExecLimit.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(LimitTestPrograms.LIMIT);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
new file mode 100644
index 00000000000..0fc8a44695f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/**
+ * {@link TableTestProgram} definitions for testing {@link
+ * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit}.
+ */
+public class LimitTestPrograms {
+
+    static final Row[] DATA1 =
+            new Row[] {
+                Row.of(2, "a", 6),
+                Row.of(4, "b", 8),
+                Row.of(6, "c", 10),
+                Row.of(1, "a", 5),
+                Row.of(3, "b", 7),
+                Row.of(5, "c", 9)
+            };
+
+    static final Row[] DATA2 = new Row[] {Row.of(8, "d", 3), Row.of(7, "e", 
2)};
+    static final TableTestProgram LIMIT =
+            TableTestProgram.of("limit", "validates limit node")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT", "b VARCHAR", "c INT")
+                                    .producedBeforeRestore(DATA1)
+                                    .producedAfterRestore(DATA2)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "b VARCHAR", "c 
BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[2, a, 6]", "+I[4, b, 8]", 
"+I[6, c, 10]")
+                                    .consumedAfterRestore(new String[] {})
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * from source_t LIMIT 
3")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
deleted file mode 100644
index 1de8f66bd56..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/** Test for limit JsonPlan ser/de. */
-class LimitJsonPlanITCase extends JsonPlanTestBase {
-    @Test
-    void testLimit() throws ExecutionException, InterruptedException, 
IOException {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data1()),
-                "a int",
-                "b varchar",
-                "c int");
-        createTestNonInsertOnlyValuesSinkTable("`result`", "a int", "b 
varchar", "c bigint");
-        String sql = "insert into `result` select * from MyTable limit 3";
-        compileSqlAndExecutePlan(sql).await();
-
-        List<String> expected = Arrays.asList("+I[2, a, 6]", "+I[4, b, 8]", 
"+I[6, c, 10]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("result"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest_jsonplan/testLimit.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/plan/limit.json
similarity index 70%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest_jsonplan/testLimit.out
rename to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/plan/limit.json
index ebce53e07cc..15cb677b4e4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest_jsonplan/testLimit.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/plan/limit.json
@@ -1,50 +1,35 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "1.19",
   "nodes" : [ {
     "id" : 1,
     "type" : "stream-exec-table-source-scan_1",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
+              "name" : "c",
+              "dataType" : "INT"
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
+          "partitionKeys" : [ ]
         }
       },
       "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ] ],
-        "producedType" : "ROW<`a` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT> NOT NULL"
-      }, {
         "type" : "LimitPushDown",
-        "limit" : 10
+        "limit" : 3
       } ]
     },
-    "outputType" : "ROW<`a` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a], metadata=[], limit=[10]]], fields=[a])",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, limit=[3]]], fields=[a, b, c])",
     "inputProperties" : [ ]
   }, {
     "id" : 2,
@@ -56,7 +41,7 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT>",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
     "description" : "Exchange(distribution=[single])"
   }, {
     "id" : 3,
@@ -67,7 +52,7 @@
     "rankRange" : {
       "type" : "Constant",
       "start" : 1,
-      "end" : 10
+      "end" : 3
     },
     "rankStrategy" : {
       "type" : "AppendFast"
@@ -85,8 +70,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT>",
-    "description" : "Limit(offset=[0], fetch=[10])",
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Limit(offset=[0], fetch=[3])",
     "rankType" : "ROW_NUMBER",
     "partition" : {
       "fields" : [ ]
@@ -101,10 +86,20 @@
     "projection" : [ {
       "kind" : "INPUT_REF",
       "inputIndex" : 0,
-      "type" : "BIGINT"
+      "type" : "INT"
     }, {
       "kind" : "INPUT_REF",
-      "inputIndex" : 0,
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
       "type" : "BIGINT"
     } ],
     "condition" : null,
@@ -115,8 +110,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `a0` BIGINT>",
-    "description" : "Calc(select=[a, a AS a0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
   }, {
     "id" : 5,
     "type" : "stream-exec-sink_1",
@@ -129,24 +124,22 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
               "dataType" : "BIGINT"
             } ],
             "watermarkSpecs" : [ ]
           },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
+          "partitionKeys" : [ ]
         }
       }
     },
@@ -158,8 +151,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `a0` BIGINT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, a0])"
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
     "source" : 1,
@@ -190,4 +183,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/savepoint/_metadata
new file mode 100644
index 00000000000..c29a96bc807
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/savepoint/_metadata
 differ

Reply via email to