This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68cc61a86187021c61e7f51ccff8c5912125d013
Author: Jim Hughes <[email protected]>
AuthorDate: Fri Mar 22 18:18:31 2024 -0400

    [FLINK-33676] Deleting WindowAggregateJsonPlanTest.java and 
WindowAggregateJsonITCase.java
---
 .../exec/stream/WindowAggregateJsonPlanTest.java   | 528 ---------------------
 .../stream/jsonplan/WindowAggregateJsonITCase.java | 243 ----------
 2 files changed, 771 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
deleted file mode 100644
index 413b5a30062..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for window aggregate. */
-class WindowAggregateJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-
-        String insertOnlyTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + " a INT,\n"
-                        + " b BIGINT,\n"
-                        + " c VARCHAR,\n"
-                        + " `rowtime` AS TO_TIMESTAMP(c),\n"
-                        + " proctime as PROCTIME(),\n"
-                        + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(insertOnlyTableDdl);
-
-        String changelogTableDdl =
-                "CREATE TABLE MyCDCTable (\n"
-                        + " a INT,\n"
-                        + " b BIGINT,\n"
-                        + " c VARCHAR,\n"
-                        + " `rowtime` AS TO_TIMESTAMP(c),\n"
-                        + " proctime as PROCTIME(),\n"
-                        + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL 
'1' SECOND\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values',\n"
-                        + " 'changelog-mode' = 'I,UA,UB,D')\n";
-        tEnv.executeSql(changelogTableDdl);
-    }
-
-    @Test
-    void testEventTimeTumbleWindow() {
-        tEnv.createFunction("concat_distinct_agg", 
ConcatDistinctAggFunction.class);
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_start TIMESTAMP(3),\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT,\n"
-                        + " distinct_cnt BIGINT,\n"
-                        + " concat_distinct STRING\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_start,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*),\n"
-                        + "  SUM(a),\n"
-                        + "  COUNT(DISTINCT c),\n"
-                        + "  concat_distinct_agg(c)\n"
-                        + "FROM TABLE(\n"
-                        + "   TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeTumbleWindowWithCDCSource() {
-        tEnv.createFunction("concat_distinct_agg", 
ConcatDistinctAggFunction.class);
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_start TIMESTAMP(3),\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT,\n"
-                        + " distinct_cnt BIGINT,\n"
-                        + " concat_distinct STRING\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_start,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*),\n"
-                        + "  SUM(a),\n"
-                        + "  COUNT(DISTINCT c),\n"
-                        + "  concat_distinct_agg(c)\n"
-                        + "FROM TABLE(\n"
-                        + "   TUMBLE(TABLE MyCDCTable, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeTumbleWindowWithOffset() {
-        tEnv.createFunction("concat_distinct_agg", 
ConcatDistinctAggFunction.class);
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_start TIMESTAMP(3),\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT,\n"
-                        + " distinct_cnt BIGINT,\n"
-                        + " concat_distinct STRING\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_start,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*),\n"
-                        + "  SUM(a),\n"
-                        + "  COUNT(DISTINCT c),\n"
-                        + "  concat_distinct_agg(c)\n"
-                        + "FROM TABLE(\n"
-                        + "   TUMBLE(\n"
-                        + "     TABLE MyTable,\n"
-                        + "     DESCRIPTOR(rowtime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '5' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeTumbleWindow() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*)\n"
-                        + "FROM TABLE(\n"
-                        + "   TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), 
INTERVAL '15' MINUTE))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeTumbleWindowWithCDCSource() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*)\n"
-                        + "FROM TABLE(\n"
-                        + "   TUMBLE(TABLE MyCDCTable, DESCRIPTOR(proctime), 
INTERVAL '15' MINUTE))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeHopWindow() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  COUNT(c),\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL 
'5' SECOND, INTERVAL '10' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeHopWindowWithCDCSource() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  COUNT(c),\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   HOP(TABLE MyCDCTable, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeHopWindowWithOffset() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  COUNT(c),\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   HOP(\n"
-                        + "     TABLE MyTable,\n"
-                        + "     DESCRIPTOR(rowtime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '10' SECOND,\n"
-                        + "     INTERVAL '5' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeHopWindow() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   HOP(TABLE MyTable, DESCRIPTOR(proctime), 
INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeHopWindowWithCDCSource() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   HOP(TABLE MyCDCTable, DESCRIPTOR(proctime), 
INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeCumulateWindow() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(c),\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   CUMULATE(\n"
-                        + "     TABLE MyTable,\n"
-                        + "     DESCRIPTOR(rowtime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '15' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeCumulateWindowWithCDCSource() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(c),\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   CUMULATE(\n"
-                        + "     TABLE MyCDCTable,\n"
-                        + "     DESCRIPTOR(rowtime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '15' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeCumulateWindowWithOffset() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT,\n"
-                        + " sum_a INT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(c),\n"
-                        + "  SUM(a)\n"
-                        + "FROM TABLE(\n"
-                        + "   CUMULATE(\n"
-                        + "     TABLE MyTable,\n"
-                        + "     DESCRIPTOR(rowtime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '15' SECOND,\n"
-                        + "     INTERVAL '15' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeCumulateWindow() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  COUNT(c)\n"
-                        + "FROM TABLE(\n"
-                        + "   CUMULATE(\n"
-                        + "     TABLE MyTable,\n"
-                        + "     DESCRIPTOR(proctime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '15' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeCumulateWindowWithCDCSource() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  COUNT(c)\n"
-                        + "FROM TABLE(\n"
-                        + "   CUMULATE(\n"
-                        + "     TABLE MyCDCTable,\n"
-                        + "     DESCRIPTOR(proctime),\n"
-                        + "     INTERVAL '5' SECOND,\n"
-                        + "     INTERVAL '15' SECOND))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testDistinctSplitEnabled() {
-        tEnv.getConfig()
-                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  window_start timestamp(3),\n"
-                        + "  window_end timestamp(3),\n"
-                        + "  cnt_star bigint,\n"
-                        + "  sum_b bigint,\n"
-                        + "  cnt_distinct_c bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-
-        util.verifyJsonPlan(
-                "insert into MySink select a, "
-                        + "   window_start, "
-                        + "   window_end, "
-                        + "   count(*), "
-                        + "   sum(b), "
-                        + "   count(distinct c) AS uv "
-                        + "FROM TABLE ("
-                        + "   CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), 
INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) "
-                        + "GROUP BY a, window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeSessionWindowWithoutPartitionKey() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*)\n"
-                        + "FROM TABLE(\n"
-                        + "   SESSION(TABLE MyTable, DESCRIPTOR(proctime), 
INTERVAL '15' MINUTE))\n"
-                        + "GROUP BY window_start, window_end");
-    }
-
-    @Test
-    void testProcTimeSessionWindowWithPartitionKey() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*)\n"
-                        + "FROM TABLE(\n"
-                        + "   SESSION(TABLE MyTable PARTITION BY b, 
DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeSessionWindowWithoutPartitionKey() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*)\n"
-                        + "FROM TABLE(\n"
-                        + "   SESSION(TABLE MyTable, DESCRIPTOR(rowtime), 
INTERVAL '15' MINUTE))\n"
-                        + "GROUP BY window_start, window_end");
-    }
-
-    @Test
-    void testEventTimeSessionWindowWithPartitionKey() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
-                        + " cnt BIGINT\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values')\n";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select\n"
-                        + "  b,\n"
-                        + "  window_end,\n"
-                        + "  COUNT(*)\n"
-                        + "FROM TABLE(\n"
-                        + "   SESSION(TABLE MyTable PARTITION BY b, 
DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n"
-                        + "GROUP BY b, window_start, window_end");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
deleted file mode 100644
index 47983bd3d2e..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/** Test for window aggregate json plan. */
-@ExtendWith(ParameterizedTestExtension.class)
-class WindowAggregateJsonITCase extends JsonPlanTestBase {
-
-    @Parameters(name = "agg_phase = {0}")
-    private static Object[] parameters() {
-        return new Object[][] {
-            new Object[] {AggregatePhaseStrategy.ONE_PHASE},
-            new Object[] {AggregatePhaseStrategy.TWO_PHASE}
-        };
-    }
-
-    @Parameter private AggregatePhaseStrategy aggPhase;
-
-    @BeforeEach
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        createTestValuesSourceTable(
-                "MyTable",
-                
JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()),
-                new String[] {
-                    "ts STRING",
-                    "`int` INT",
-                    "`double` DOUBLE",
-                    "`float` FLOAT",
-                    "`bigdec` DECIMAL(10, 2)",
-                    "`string` STRING",
-                    "`name` STRING",
-                    "`rowtime` AS TO_TIMESTAMP(`ts`)",
-                    "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' 
SECOND",
-                },
-                new HashMap<String, String>() {
-                    {
-                        put("enable-watermark-push-down", "true");
-                        put("failing-source", "true");
-                    }
-                });
-        tableEnv.getConfig()
-                .set(
-                        
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        aggPhase.toString());
-    }
-
-    @TestTemplate
-    void testEventTimeTumbleWindow() throws Exception {
-        createTestValuesSinkTable(
-                "MySink",
-                "name STRING",
-                "window_start TIMESTAMP(3)",
-                "window_end TIMESTAMP(3)",
-                "cnt BIGINT",
-                "sum_int INT",
-                "distinct_cnt BIGINT");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select\n"
-                                + "  name,\n"
-                                + "  window_start,\n"
-                                + "  window_end,\n"
-                                + "  COUNT(*),\n"
-                                + "  SUM(`int`),\n"
-                                + "  COUNT(DISTINCT `string`)\n"
-                                + "FROM TABLE(\n"
-                                + "   TUMBLE(TABLE MyTable, 
DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n"
-                                + "GROUP BY name, window_start, window_end")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList(
-                        "+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 
2]",
-                        "+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 
1]",
-                        "+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 
2]",
-                        "+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 
1]",
-                        "+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 1, 
1]",
-                        "+I[null, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 
7, 0]"),
-                result);
-    }
-
-    @TestTemplate
-    void testEventTimeHopWindow() throws Exception {
-        createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select\n"
-                                + "  name,\n"
-                                + "  COUNT(*)\n"
-                                + "FROM TABLE(\n"
-                                + "   HOP(TABLE MyTable, DESCRIPTOR(rowtime), 
INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n"
-                                + "GROUP BY name, window_start, window_end")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList(
-                        "+I[a, 1]",
-                        "+I[a, 4]",
-                        "+I[a, 6]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 2]",
-                        "+I[b, 2]",
-                        "+I[null, 1]",
-                        "+I[null, 1]"),
-                result);
-    }
-
-    @TestTemplate
-    void testEventTimeCumulateWindow() throws Exception {
-        createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select\n"
-                                + "  name,\n"
-                                + "  COUNT(*)\n"
-                                + "FROM TABLE(\n"
-                                + "  CUMULATE(\n"
-                                + "     TABLE MyTable,\n"
-                                + "     DESCRIPTOR(rowtime),\n"
-                                + "     INTERVAL '5' SECOND,\n"
-                                + "     INTERVAL '15' SECOND))"
-                                + "GROUP BY name, window_start, window_end")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList(
-                        "+I[a, 4]",
-                        "+I[a, 6]",
-                        "+I[a, 6]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 1]",
-                        "+I[b, 2]",
-                        "+I[b, 2]",
-                        "+I[null, 1]",
-                        "+I[null, 1]",
-                        "+I[null, 1]"),
-                result);
-    }
-
-    @TestTemplate
-    void testDistinctSplitEnabled() throws Exception {
-        tableEnv.getConfig()
-                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
-        createTestValuesSinkTable(
-                "MySink", "name STRING", "max_double DOUBLE", 
"cnt_distinct_int BIGINT");
-
-        compileSqlAndExecutePlan(
-                        "insert into MySink select name, "
-                                + "   max(`double`),\n"
-                                + "   count(distinct `int`) "
-                                + "FROM TABLE ("
-                                + "  CUMULATE(\n"
-                                + "     TABLE MyTable,\n"
-                                + "     DESCRIPTOR(rowtime),\n"
-                                + "     INTERVAL '5' SECOND,\n"
-                                + "     INTERVAL '15' SECOND))"
-                                + "GROUP BY name, window_start, window_end")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList(
-                        "+I[a, 5.0, 3]",
-                        "+I[a, 5.0, 4]",
-                        "+I[a, 5.0, 4]",
-                        "+I[b, 3.0, 1]",
-                        "+I[b, 3.0, 1]",
-                        "+I[b, 3.0, 1]",
-                        "+I[b, 4.0, 1]",
-                        "+I[b, 4.0, 1]",
-                        "+I[b, 4.0, 1]",
-                        "+I[b, 6.0, 2]",
-                        "+I[b, 6.0, 2]",
-                        "+I[null, 7.0, 1]",
-                        "+I[null, 7.0, 1]",
-                        "+I[null, 7.0, 1]"),
-                result);
-    }
-
-    @TestTemplate
-    public void testEventTimeSessionWindow() throws Exception {
-        createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select\n"
-                                + "  name,\n"
-                                + "  COUNT(*)\n"
-                                + "FROM TABLE(\n"
-                                + "  SESSION(\n"
-                                + "     TABLE MyTable PARTITION BY name,\n"
-                                + "     DESCRIPTOR(rowtime),\n"
-                                + "     INTERVAL '5' SECOND))"
-                                + "GROUP BY name, window_start, window_end")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList("+I[a, 6]", "+I[b, 1]", "+I[b, 1]", "+I[b, 2]", 
"+I[null, 1]"),
-                result);
-    }
-}

Reply via email to