This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c233ed2599188ba63e361b1b4525d9f322965f65
Author: bvarghese1 <[email protected]>
AuthorDate: Wed Dec 27 11:18:35 2023 -0800

    [FLINK-33518] Remove WatermarkAssigner JSON Plan & IT tests
    
    - The json tests are covered by the newly introduced restore tests
---
 .../exec/stream/WatermarkAssignerJsonPlanTest.java |  66 --------
 .../jsonplan/WatermarkAssignerJsonPlanITCase.java  | 135 ---------------
 .../testWatermarkAssigner.out                      | 181 ---------------------
 3 files changed, 382 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest.java
deleted file mode 100644
index a8690eec56f..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for watermark assigner. */
-class WatermarkAssignerJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-    }
-
-    @Test
-    void testWatermarkAssigner() {
-        String srcTableDdl =
-                "CREATE TABLE WatermarkTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  c timestamp(3),\n"
-                        + "  watermark for c as c - interval '5' second\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false',"
-                        + "  'enable-watermark-push-down' = 'false',"
-                        + "  'disable-lookup' = 'true')";
-        tEnv.executeSql(srcTableDdl);
-        String sinkTableDdl =
-                "CREATE TABLE sink (\n"
-                        + "  a bigint,\n"
-                        + "  b int,\n"
-                        + "  c timestamp(3)\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan("insert into sink select * from WatermarkTable");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
deleted file mode 100644
index bfc32e880a5..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WatermarkAssignerJsonPlanITCase.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
-
-/** Test for watermark assigner json plan. */
-class WatermarkAssignerJsonPlanITCase extends JsonPlanTestBase {
-
-    @Test
-    void testWatermarkAssigner() throws Exception {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
-                new String[] {
-                    "a int",
-                    "b bigint",
-                    "c varchar",
-                    "ts timestamp(3)",
-                    "watermark for ts as ts - interval '5' second"
-                },
-                new HashMap<String, String>() {
-                    {
-                        put("enable-watermark-push-down", "false");
-                    }
-                });
-
-        File sinkPath = createTestCsvSinkTable("MySink", "a int", "b bigint", 
"ts timestamp(3)");
-
-        compileSqlAndExecutePlan("insert into MySink select a, b, ts from 
MyTable where b = 3")
-                .await();
-
-        assertResult(
-                Arrays.asList(
-                        "4,3," + toLocalDateTime(4000L),
-                        "5,3," + toLocalDateTime(5000L),
-                        "6,3," + toLocalDateTime(6000L)),
-                sinkPath);
-    }
-
-    @Test
-    void testWatermarkPushDownWithMetadata() throws Exception {
-        // to verify FLINK-30598: the case declares metadata field first, 
without the fix it'll get
-        // wrong code generated by WatermarkGeneratorCodeGenerator which 
reference the incorrect
-        // varchar column as the watermark field.
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
-                new String[] {
-                    "ts timestamp(3) metadata",
-                    "a int",
-                    "b bigint",
-                    "c varchar",
-                    "watermark for ts as ts - interval '5' second"
-                },
-                new HashMap<String, String>() {
-                    {
-                        put("enable-watermark-push-down", "true");
-                        put("readable-metadata", "ts:timestamp(3)");
-                    }
-                });
-
-        File sinkPath =
-                createTestCsvSinkTable(
-                        "MySink", "a int", "b bigint", "c varchar", "ts 
timestamp(3)");
-
-        compileSqlAndExecutePlan("insert into MySink select a, b, c, ts from 
MyTable where b = 3")
-                .await();
-
-        assertResult(
-                Arrays.asList(
-                        "4,3,Hello world, how are you?," + 
toLocalDateTime(4000L),
-                        "5,3,I am fine.," + toLocalDateTime(5000L),
-                        "6,3,Luke Skywalker," + toLocalDateTime(6000L)),
-                sinkPath);
-    }
-
-    @Test
-    void testWatermarkAndProjectPushDownWithMetadata() throws Exception {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data3WithTimestamp()),
-                new String[] {
-                    "ts timestamp(3) metadata",
-                    "a int",
-                    "b bigint",
-                    "c varchar",
-                    "watermark for ts as ts - interval '5' second"
-                },
-                new HashMap<String, String>() {
-                    {
-                        put("enable-watermark-push-down", "true");
-                        put("readable-metadata", "ts:timestamp(3)");
-                    }
-                });
-
-        File sinkPath = createTestCsvSinkTable("MySink", "c varchar", "ts 
timestamp(3)");
-
-        compileSqlAndExecutePlan("insert into MySink select c, ts from MyTable 
where b = 3")
-                .await();
-
-        assertResult(
-                Arrays.asList(
-                        "Hello world, how are you?," + toLocalDateTime(4000L),
-                        "I am fine.," + toLocalDateTime(5000L),
-                        "Luke Skywalker," + toLocalDateTime(6000L)),
-                sinkPath);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest_jsonplan/testWatermarkAssigner.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest_jsonplan/testWatermarkAssigner.out
deleted file mode 100644
index 162b6b74d0c..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WatermarkAssignerJsonPlanTest_jsonplan/testWatermarkAssigner.out
+++ /dev/null
@@ -1,181 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`WatermarkTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "c",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "syntax" : "SPECIAL",
-                  "internalName" : "$-$1",
-                  "operands" : [ {
-                    "kind" : "INPUT_REF",
-                    "inputIndex" : 2,
-                    "type" : "TIMESTAMP(3)"
-                  }, {
-                    "kind" : "LITERAL",
-                    "value" : "5000",
-                    "type" : "INTERVAL SECOND(6) NOT NULL"
-                  } ],
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`c` - INTERVAL '5' SECOND"
-              }
-            } ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values",
-            "disable-lookup" : "true",
-            "enable-watermark-push-down" : "false"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, WatermarkTable]], fields=[a, b, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$-$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "TIMESTAMP(3)"
-      }, {
-        "kind" : "LITERAL",
-        "value" : "5000",
-        "type" : "INTERVAL SECOND(6) NOT NULL"
-      } ],
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "a",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "b",
-        "fieldType" : "INT"
-      }, {
-        "name" : "c",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[c], watermark=[(c - 
5000:INTERVAL SECOND)])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`sink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT"
-            }, {
-              "name" : "c",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "a",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "b",
-        "fieldType" : "INT"
-      }, {
-        "name" : "c",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}

Reply via email to