This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68cc61a86187021c61e7f51ccff8c5912125d013 Author: Jim Hughes <[email protected]> AuthorDate: Fri Mar 22 18:18:31 2024 -0400 [FLINK-33676] Deleting WindowAggregateJsonPlanTest.java and WindowAggregateJsonITCase.java --- .../exec/stream/WindowAggregateJsonPlanTest.java | 528 --------------------- .../stream/jsonplan/WindowAggregateJsonITCase.java | 243 ---------- 2 files changed, 771 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java deleted file mode 100644 index 413b5a30062..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java +++ /dev/null @@ -1,528 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for window aggregate. */ -class WindowAggregateJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - - String insertOnlyTableDdl = - "CREATE TABLE MyTable (\n" - + " a INT,\n" - + " b BIGINT,\n" - + " c VARCHAR,\n" - + " `rowtime` AS TO_TIMESTAMP(c),\n" - + " proctime as PROCTIME(),\n" - + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(insertOnlyTableDdl); - - String changelogTableDdl = - "CREATE TABLE MyCDCTable (\n" - + " a INT,\n" - + " b BIGINT,\n" - + " c VARCHAR,\n" - + " `rowtime` AS TO_TIMESTAMP(c),\n" - + " proctime as PROCTIME(),\n" - + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n" - + ") WITH (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D')\n"; - tEnv.executeSql(changelogTableDdl); - } - - @Test - void testEventTimeTumbleWindow() { - tEnv.createFunction("concat_distinct_agg", ConcatDistinctAggFunction.class); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_start TIMESTAMP(3),\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT,\n" - + " sum_a INT,\n" - + " distinct_cnt BIGINT,\n" - + " concat_distinct STRING\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_start,\n" - + " window_end,\n" - + " COUNT(*),\n" - + " SUM(a),\n" - + " COUNT(DISTINCT c),\n" - + " concat_distinct_agg(c)\n" - + "FROM TABLE(\n" - + " TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeTumbleWindowWithCDCSource() { - tEnv.createFunction("concat_distinct_agg", ConcatDistinctAggFunction.class); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_start TIMESTAMP(3),\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT,\n" - + " sum_a INT,\n" - + " distinct_cnt BIGINT,\n" - + " concat_distinct STRING\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_start,\n" - + " window_end,\n" - + " COUNT(*),\n" - + " SUM(a),\n" - + " COUNT(DISTINCT c),\n" - + " concat_distinct_agg(c)\n" - + "FROM TABLE(\n" - + " TUMBLE(TABLE MyCDCTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeTumbleWindowWithOffset() { - tEnv.createFunction("concat_distinct_agg", ConcatDistinctAggFunction.class); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_start TIMESTAMP(3),\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT,\n" - + " sum_a INT,\n" - + " distinct_cnt BIGINT,\n" - + " concat_distinct STRING\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_start,\n" - + " window_end,\n" - + " COUNT(*),\n" - + " SUM(a),\n" - + " COUNT(DISTINCT c),\n" - + " concat_distinct_agg(c)\n" - + "FROM TABLE(\n" - + " TUMBLE(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '5' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testProcTimeTumbleWindow() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testProcTimeTumbleWindowWithCDCSource() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " TUMBLE(TABLE MyCDCTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeHopWindow() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " cnt BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " COUNT(c),\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeHopWindowWithCDCSource() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " cnt BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " COUNT(c),\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " HOP(TABLE MyCDCTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeHopWindowWithOffset() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " cnt BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " COUNT(c),\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " HOP(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '10' SECOND,\n" - + " INTERVAL '5' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testProcTimeHopWindow() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testProcTimeHopWindowWithCDCSource() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " HOP(TABLE MyCDCTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeCumulateWindow() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(c),\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " CUMULATE(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeCumulateWindowWithCDCSource() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(c),\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " CUMULATE(\n" - + " TABLE MyCDCTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeCumulateWindowWithOffset() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT,\n" - + " sum_a INT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(c),\n" - + " SUM(a)\n" - + "FROM TABLE(\n" - + " CUMULATE(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND,\n" - + " INTERVAL '15' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testProcTimeCumulateWindow() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " COUNT(c)\n" - + "FROM TABLE(\n" - + " CUMULATE(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(proctime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testProcTimeCumulateWindowWithCDCSource() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " COUNT(c)\n" - + "FROM TABLE(\n" - + " CUMULATE(\n" - + " TABLE MyCDCTable,\n" - + " DESCRIPTOR(proctime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testDistinctSplitEnabled() { - tEnv.getConfig() - .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " window_start timestamp(3),\n" - + " window_end timestamp(3),\n" - + " cnt_star bigint,\n" - + " sum_b bigint,\n" - + " cnt_distinct_c bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - util.verifyJsonPlan( - "insert into MySink select a, " - + " window_start, " - + " window_end, " - + " count(*), " - + " sum(b), " - + " count(distinct c) AS uv " - + "FROM TABLE (" - + " CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) " - + "GROUP BY a, window_start, window_end"); - } - - @Test - void testProcTimeSessionWindowWithoutPartitionKey() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " window_end,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n" - + "GROUP BY window_start, window_end"); - } - - @Test - void testProcTimeSessionWindowWithPartitionKey() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " SESSION(TABLE MyTable PARTITION BY b, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n" - + "GROUP BY b, window_start, window_end"); - } - - @Test - void testEventTimeSessionWindowWithoutPartitionKey() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " window_end,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n" - + "GROUP BY window_start, window_end"); - } - - @Test - void testEventTimeSessionWindowWithPartitionKey() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b BIGINT,\n" - + " window_end TIMESTAMP(3),\n" - + " cnt BIGINT\n" - + ") WITH (\n" - + " 'connector' = 'values')\n"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select\n" - + " b,\n" - + " window_end,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " SESSION(TABLE MyTable PARTITION BY b, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n" - + "GROUP BY b, window_start, window_end"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java deleted file mode 100644 index 47983bd3d2e..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -/** Test for window aggregate json plan. */ -@ExtendWith(ParameterizedTestExtension.class) -class WindowAggregateJsonITCase extends JsonPlanTestBase { - - @Parameters(name = "agg_phase = {0}") - private static Object[] parameters() { - return new Object[][] { - new Object[] {AggregatePhaseStrategy.ONE_PHASE}, - new Object[] {AggregatePhaseStrategy.TWO_PHASE} - }; - } - - @Parameter private AggregatePhaseStrategy aggPhase; - - @BeforeEach - @Override - protected void setup() throws Exception { - super.setup(); - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()), - new String[] { - "ts STRING", - "`int` INT", - "`double` DOUBLE", - "`float` FLOAT", - "`bigdec` DECIMAL(10, 2)", - "`string` STRING", - "`name` STRING", - "`rowtime` AS TO_TIMESTAMP(`ts`)", - "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND", - }, - new HashMap<String, String>() { - { - put("enable-watermark-push-down", "true"); - put("failing-source", "true"); - } - }); - tableEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, - aggPhase.toString()); - } - - @TestTemplate - void testEventTimeTumbleWindow() throws Exception { - createTestValuesSinkTable( - "MySink", - "name STRING", - "window_start TIMESTAMP(3)", - "window_end TIMESTAMP(3)", - "cnt BIGINT", - "sum_int INT", - "distinct_cnt BIGINT"); - compileSqlAndExecutePlan( - "insert into MySink select\n" - + " name,\n" - + " window_start,\n" - + " window_end,\n" - + " COUNT(*),\n" - + " SUM(`int`),\n" - + " COUNT(DISTINCT `string`)\n" - + "FROM TABLE(\n" - + " TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n" - + "GROUP BY name, window_start, window_end") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList( - "+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]", - "+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]", - "+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]", - "+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]", - "+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 1, 1]", - "+I[null, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 7, 0]"), - result); - } - - @TestTemplate - void testEventTimeHopWindow() throws Exception { - createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT"); - compileSqlAndExecutePlan( - "insert into MySink select\n" - + " name,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n" - + "GROUP BY name, window_start, window_end") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList( - "+I[a, 1]", - "+I[a, 4]", - "+I[a, 6]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 2]", - "+I[b, 2]", - "+I[null, 1]", - "+I[null, 1]"), - result); - } - - @TestTemplate - void testEventTimeCumulateWindow() throws Exception { - createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT"); - compileSqlAndExecutePlan( - "insert into MySink select\n" - + " name,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " CUMULATE(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND))" - + "GROUP BY name, window_start, window_end") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList( - "+I[a, 4]", - "+I[a, 6]", - "+I[a, 6]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 1]", - "+I[b, 2]", - "+I[b, 2]", - "+I[null, 1]", - "+I[null, 1]", - "+I[null, 1]"), - result); - } - - @TestTemplate - void testDistinctSplitEnabled() throws Exception { - tableEnv.getConfig() - .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true); - createTestValuesSinkTable( - "MySink", "name STRING", "max_double DOUBLE", "cnt_distinct_int BIGINT"); - - compileSqlAndExecutePlan( - "insert into MySink select name, " - + " max(`double`),\n" - + " count(distinct `int`) " - + "FROM TABLE (" - + " CUMULATE(\n" - + " TABLE MyTable,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND,\n" - + " INTERVAL '15' SECOND))" - + "GROUP BY name, window_start, window_end") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList( - "+I[a, 5.0, 3]", - "+I[a, 5.0, 4]", - "+I[a, 5.0, 4]", - "+I[b, 3.0, 1]", - "+I[b, 3.0, 1]", - "+I[b, 3.0, 1]", - "+I[b, 4.0, 1]", - "+I[b, 4.0, 1]", - "+I[b, 4.0, 1]", - "+I[b, 6.0, 2]", - "+I[b, 6.0, 2]", - "+I[null, 7.0, 1]", - "+I[null, 7.0, 1]", - "+I[null, 7.0, 1]"), - result); - } - - @TestTemplate - public void testEventTimeSessionWindow() throws Exception { - createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT"); - compileSqlAndExecutePlan( - "insert into MySink select\n" - + " name,\n" - + " COUNT(*)\n" - + "FROM TABLE(\n" - + " SESSION(\n" - + " TABLE MyTable PARTITION BY name,\n" - + " DESCRIPTOR(rowtime),\n" - + " INTERVAL '5' SECOND))" - + "GROUP BY name, window_start, window_end") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList("+I[a, 6]", "+I[b, 1]", "+I[b, 1]", "+I[b, 2]", "+I[null, 1]"), - result); - } -}
