This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c233ed2599188ba63e361b1b4525d9f322965f65 Author: bvarghese1 <[email protected]> AuthorDate: Wed Dec 27 11:18:35 2023 -0800 [FLINK-33518] Remove WatermarkAssigner JSON Plan & IT tests - The json tests are covered by the newly introduced restore tests --- .../exec/stream/WatermarkAssignerJsonPlanTest.java | 66 -------- .../jsonplan/WatermarkAssignerJsonPlanITCase.java | 135 --------------- .../testWatermarkAssigner.out | 181 --------------------- 3 files changed, 382 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest.java deleted file mode 100644 index a8690eec56f..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for watermark assigner. */ -class WatermarkAssignerJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - } - - @Test - void testWatermarkAssigner() { - String srcTableDdl = - "CREATE TABLE WatermarkTable (\n" - + " a bigint,\n" - + " b int,\n" - + " c timestamp(3),\n" - + " watermark for c as c - interval '5' second\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false'," - + " 'enable-watermark-push-down' = 'false'," - + " 'disable-lookup' = 'true')"; - tEnv.executeSql(srcTableDdl); - String sinkTableDdl = - "CREATE TABLE sink (\n" - + " a bigint,\n" - + " b int,\n" - + " c timestamp(3)\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan("insert into sink select * from WatermarkTable"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java deleted file mode 100644 index bfc32e880a5..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; - -import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime; - -/** Test for watermark assigner json plan. */ -class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase { - - @Test - void testWatermarkAssigner() throws Exception { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()), - new String[] { - "a int", - "b bigint", - "c varchar", - "ts timestamp(3)", - "watermark for ts as ts - interval '5' second" - }, - new HashMap<String, String>() { - { - put("enable-watermark-push-down", "false"); - } - }); - - File sinkPath = createTestCsvSinkTable("MySink", "a int", "b bigint", "ts timestamp(3)"); - - compileSqlAndExecutePlan("insert into MySink select a, b, ts from MyTable where b = 3") - .await(); - - assertResult( - Arrays.asList( - "4,3," + toLocalDateTime(4000L), - "5,3," + toLocalDateTime(5000L), - "6,3," + toLocalDateTime(6000L)), - sinkPath); - } - - @Test - void testWatermarkPushDownWithMetadata() throws Exception { - // to verify FLINK-30598: the case declares metadata field first, without the fix it'll get - // wrong code generated by WatermarkGeneratorCodeGenerator which reference the incorrect - // varchar column as the watermark field. - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()), - new String[] { - "ts timestamp(3) metadata", - "a int", - "b bigint", - "c varchar", - "watermark for ts as ts - interval '5' second" - }, - new HashMap<String, String>() { - { - put("enable-watermark-push-down", "true"); - put("readable-metadata", "ts:timestamp(3)"); - } - }); - - File sinkPath = - createTestCsvSinkTable( - "MySink", "a int", "b bigint", "c varchar", "ts timestamp(3)"); - - compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from MyTable where b = 3") - .await(); - - assertResult( - Arrays.asList( - "4,3,Hello world, how are you?," + toLocalDateTime(4000L), - "5,3,I am fine.," + toLocalDateTime(5000L), - "6,3,Luke Skywalker," + toLocalDateTime(6000L)), - sinkPath); - } - - @Test - void testWatermarkAndProjectPushDownWithMetadata() throws Exception { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()), - new String[] { - "ts timestamp(3) metadata", - "a int", - "b bigint", - "c varchar", - "watermark for ts as ts - interval '5' second" - }, - new HashMap<String, String>() { - { - put("enable-watermark-push-down", "true"); - put("readable-metadata", "ts:timestamp(3)"); - } - }); - - File sinkPath = createTestCsvSinkTable("MySink", "c varchar", "ts timestamp(3)"); - - compileSqlAndExecutePlan("insert into MySink select c, ts from MyTable where b = 3") - .await(); - - assertResult( - Arrays.asList( - "Hello world, how are you?," + toLocalDateTime(4000L), - "I am fine.," + toLocalDateTime(5000L), - "Luke Skywalker," + toLocalDateTime(6000L)), - sinkPath); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest_jsonplan/testWatermarkAssigner.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest_jsonplan/testWatermarkAssigner.out deleted file mode 100644 index 162b6b74d0c..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest_jsonplan/testWatermarkAssigner.out +++ /dev/null @@ -1,181 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`WatermarkTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "c", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "5000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`c` - INTERVAL '5' SECOND" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values", - "disable-lookup" : "true", - "enable-watermark-push-down" : "false" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `c` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, WatermarkTable]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "5000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "BIGINT" - }, { - "name" : "b", - "fieldType" : "INT" - }, { - "name" : "c", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[c], watermark=[(c - 5000:INTERVAL SECOND)])" - }, { - "id" : 3, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`sink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT" - }, { - "name" : "c", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "BIGINT" - }, { - "name" : "b", - "fieldType" : "INT" - }, { - "name" : "c", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}
