This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7542b56f2ab [FLINK-33469] Implement restore tests for Limit node
(#23675)
7542b56f2ab is described below
commit 7542b56f2abb860f42a83c4687f6e38bb82b78c6
Author: James Hughes <[email protected]>
AuthorDate: Thu Nov 9 10:49:15 2023 -0500
[FLINK-33469] Implement restore tests for Limit node (#23675)
---
.../plan/nodes/exec/stream/LimitJsonPlanTest.java | 66 -----------------
.../plan/nodes/exec/stream/LimitRestoreTest.java | 38 ++++++++++
.../plan/nodes/exec/stream/LimitTestPrograms.java | 60 ++++++++++++++++
.../stream/jsonplan/LimitJsonPlanITCase.java | 50 -------------
.../stream-exec-limit_1/limit/plan/limit.json} | 79 ++++++++++-----------
.../stream-exec-limit_1/limit/savepoint/_metadata | Bin 0 -> 11422 bytes
6 files changed, 134 insertions(+), 159 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java
deleted file mode 100644
index 062edcaff15..00000000000
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-
-/** Test json serialization for sort limit. */
-public class LimitJsonPlanTest extends TableTestBase {
-
- private StreamTableTestUtil util;
- private TableEnvironment tEnv;
-
- @Before
- public void setup() {
- util = streamTestUtil(TableConfig.getDefault());
- tEnv = util.getTableEnv();
-
- String srcTableDdl =
- "CREATE TABLE MyTable (\n"
- + " a bigint,\n"
- + " b int not null,\n"
- + " c varchar,\n"
- + " d timestamp(3)\n"
- + ") with (\n"
- + " 'connector' = 'values',\n"
- + " 'bounded' = 'false')";
- tEnv.executeSql(srcTableDdl);
- }
-
- @Test
- public void testLimit() {
- String sinkTableDdl =
- "CREATE TABLE MySink (\n"
- + " a bigint,\n"
- + " b bigint\n"
- + ") with (\n"
- + " 'connector' = 'values',\n"
- + " 'sink-insert-only' = 'false',\n"
- + " 'table-sink-class' = 'DEFAULT')";
- tEnv.executeSql(sinkTableDdl);
- String sql = "insert into MySink SELECT a, a from MyTable limit 10";
- util.verifyJsonPlan(sql);
- }
-}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
new file mode 100644
index 00000000000..9d6180064ff
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecLimit}. */
+public class LimitRestoreTest extends RestoreTestBase {
+
+ public LimitRestoreTest() {
+ super(StreamExecLimit.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return Arrays.asList(LimitTestPrograms.LIMIT);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
new file mode 100644
index 00000000000..0fc8a44695f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/**
+ * {@link TableTestProgram} definitions for testing {@link
+ * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit}.
+ */
+public class LimitTestPrograms {
+
+ static final Row[] DATA1 =
+ new Row[] {
+ Row.of(2, "a", 6),
+ Row.of(4, "b", 8),
+ Row.of(6, "c", 10),
+ Row.of(1, "a", 5),
+ Row.of(3, "b", 7),
+ Row.of(5, "c", 9)
+ };
+
+ static final Row[] DATA2 = new Row[] {Row.of(8, "d", 3), Row.of(7, "e",
2)};
+ static final TableTestProgram LIMIT =
+ TableTestProgram.of("limit", "validates limit node")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT", "b VARCHAR", "c INT")
+ .producedBeforeRestore(DATA1)
+ .producedAfterRestore(DATA2)
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT", "b VARCHAR", "c
BIGINT")
+ .consumedBeforeRestore(
+ "+I[2, a, 6]", "+I[4, b, 8]",
"+I[6, c, 10]")
+ .consumedAfterRestore(new String[] {})
+ .build())
+ .runSql("INSERT INTO sink_t SELECT * from source_t LIMIT
3")
+ .build();
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
deleted file mode 100644
index 1de8f66bd56..00000000000
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LimitJsonPlanITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/** Test for limit JsonPlan ser/de. */
-class LimitJsonPlanITCase extends JsonPlanTestBase {
- @Test
- void testLimit() throws ExecutionException, InterruptedException,
IOException {
- createTestValuesSourceTable(
- "MyTable",
- JavaScalaConversionUtil.toJava(TestData.data1()),
- "a int",
- "b varchar",
- "c int");
- createTestNonInsertOnlyValuesSinkTable("`result`", "a int", "b
varchar", "c bigint");
- String sql = "insert into `result` select * from MyTable limit 3";
- compileSqlAndExecutePlan(sql).await();
-
- List<String> expected = Arrays.asList("+I[2, a, 6]", "+I[4, b, 8]",
"+I[6, c, 10]");
- assertResult(expected,
TestValuesTableFactory.getResultsAsStrings("result"));
- }
-}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest_jsonplan/testLimit.out
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/plan/limit.json
similarity index 70%
rename from
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest_jsonplan/testLimit.out
rename to
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/plan/limit.json
index ebce53e07cc..15cb677b4e4 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest_jsonplan/testLimit.out
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/plan/limit.json
@@ -1,50 +1,35 @@
{
- "flinkVersion" : "",
+ "flinkVersion" : "1.19",
"nodes" : [ {
"id" : 1,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
- "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
- "dataType" : "BIGINT"
+ "dataType" : "INT"
}, {
"name" : "b",
- "dataType" : "INT NOT NULL"
- }, {
- "name" : "c",
"dataType" : "VARCHAR(2147483647)"
}, {
- "name" : "d",
- "dataType" : "TIMESTAMP(3)"
+ "name" : "c",
+ "dataType" : "INT"
} ],
"watermarkSpecs" : [ ]
},
- "partitionKeys" : [ ],
- "options" : {
- "bounded" : "false",
- "connector" : "values"
- }
+ "partitionKeys" : [ ]
}
},
"abilities" : [ {
- "type" : "ProjectPushDown",
- "projectedFields" : [ [ 0 ] ],
- "producedType" : "ROW<`a` BIGINT> NOT NULL"
- }, {
- "type" : "ReadingMetadata",
- "metadataKeys" : [ ],
- "producedType" : "ROW<`a` BIGINT> NOT NULL"
- }, {
"type" : "LimitPushDown",
- "limit" : 10
+ "limit" : 3
} ]
},
- "outputType" : "ROW<`a` BIGINT>",
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a], metadata=[], limit=[10]]], fields=[a])",
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, limit=[3]]], fields=[a, b, c])",
"inputProperties" : [ ]
}, {
"id" : 2,
@@ -56,7 +41,7 @@
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
- "outputType" : "ROW<`a` BIGINT>",
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
"description" : "Exchange(distribution=[single])"
}, {
"id" : 3,
@@ -67,7 +52,7 @@
"rankRange" : {
"type" : "Constant",
"start" : 1,
- "end" : 10
+ "end" : 3
},
"rankStrategy" : {
"type" : "AppendFast"
@@ -85,8 +70,8 @@
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
- "outputType" : "ROW<`a` BIGINT>",
- "description" : "Limit(offset=[0], fetch=[10])",
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "Limit(offset=[0], fetch=[3])",
"rankType" : "ROW_NUMBER",
"partition" : {
"fields" : [ ]
@@ -101,10 +86,20 @@
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
- "type" : "BIGINT"
+ "type" : "INT"
}, {
"kind" : "INPUT_REF",
- "inputIndex" : 0,
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ } ],
"type" : "BIGINT"
} ],
"condition" : null,
@@ -115,8 +110,8 @@
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
- "outputType" : "ROW<`a` BIGINT, `a0` BIGINT>",
- "description" : "Calc(select=[a, a AS a0])"
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
}, {
"id" : 5,
"type" : "stream-exec-sink_1",
@@ -129,24 +124,22 @@
},
"dynamicTableSink" : {
"table" : {
- "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
- "dataType" : "BIGINT"
+ "dataType" : "INT"
}, {
"name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
"dataType" : "BIGINT"
} ],
"watermarkSpecs" : [ ]
},
- "partitionKeys" : [ ],
- "options" : {
- "connector" : "values",
- "sink-insert-only" : "false",
- "table-sink-class" : "DEFAULT"
- }
+ "partitionKeys" : [ ]
}
}
},
@@ -158,8 +151,8 @@
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
- "outputType" : "ROW<`a` BIGINT, `a0` BIGINT>",
- "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, a0])"
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b, c])"
} ],
"edges" : [ {
"source" : 1,
@@ -190,4 +183,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
-}
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/savepoint/_metadata
new file mode 100644
index 00000000000..c29a96bc807
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-limit_1/limit/savepoint/_metadata
differ