This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72e9f5b70030317e03f3e5fe57543ce970be43f2 Author: beyond1920 <[email protected]> AuthorDate: Fri Aug 2 10:24:35 2019 +0800 [FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner - Remove APPROX_COUNT_DISTINCT for now because we still don't support it yet. - Remove INCR_SUM because it is not a standard aggregate function. This closes #9316 --- .../functions/sql/FlinkSqlOperatorTable.java | 6 -- .../functions/sql/SqlIncrSumAggFunction.java | 75 ---------------------- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 2 - .../planner/plan/utils/AggFunctionFactory.scala | 4 +- .../FlinkRelMdModifiedMonotonicityTest.scala | 11 ---- .../table/planner/plan/stream/sql/RankTest.scala | 75 ---------------------- .../sql/agg/SortDistinctAggregateITCase.scala | 10 +-- .../runtime/stream/sql/AggregateITCase.scala | 36 ----------- 8 files changed, 2 insertions(+), 217 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index dbabb69..e201400 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -950,11 +950,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { */ public static final SqlListAggFunction LISTAGG = new SqlListAggFunction(); - /** - * <code>INCR_SUM</code> aggregate function. - */ - public static final SqlIncrSumAggFunction INCR_SUM = new SqlIncrSumAggFunction(); - // ----------------------------------------------------------------------------- // Window SQL functions // ----------------------------------------------------------------------------- @@ -1072,7 +1067,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM; public static final SqlAggFunction SUM0 = SqlStdOperatorTable.SUM0; public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT; - public static final SqlAggFunction APPROX_COUNT_DISTINCT = SqlStdOperatorTable.APPROX_COUNT_DISTINCT; public static final SqlAggFunction COLLECT = SqlStdOperatorTable.COLLECT; public static final SqlAggFunction MIN = SqlStdOperatorTable.MIN; public static final SqlAggFunction MAX = SqlStdOperatorTable.MAX; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java deleted file mode 100644 index 0b92186..0000000 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.functions.sql; - -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; - -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlSplittableAggFunction; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.sql.type.SqlTypeName; - -import java.util.List; - -/** - * <code>INCR_SUM</code> is an aggregator which returns the sum of the values which - * go into it like SUM. It differs in that the modified monotonicity of - * INCR_SUM is INCREASING, while that of SUM should be inferred using - * extra information. - */ -public class SqlIncrSumAggFunction extends SqlAggFunction { - - public SqlIncrSumAggFunction() { - super( - "INCR_SUM", - null, - SqlKind.SUM, - ReturnTypes.AGG_SUM, - null, - OperandTypes.NUMERIC, - SqlFunctionCategory.NUMERIC, - false, - false); - } - - @Override - public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) { - return ImmutableList.of( - typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true)); - } - - @Override - public RelDataType getReturnType(RelDataTypeFactory typeFactory) { - return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true); - } - - @Override - public <T> T unwrap(Class<T> clazz) { - if (clazz == SqlSplittableAggFunction.class) { - return clazz.cast(SqlSplittableAggFunction.CountSplitter.INSTANCE); - } else { - return super.unwrap(clazz); - } - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index 57de91f..9c6c77a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.functions.sql.SqlIncrSumAggFunction import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ModifiedMonotonicity @@ -337,7 +336,6 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon case SqlKind.MIN => DECREASING case _ => NOT_MONOTONIC } - case _: SqlIncrSumAggFunction => INCREASING case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => val valueInterval = fmq.getFilteredColumnInterval( input, aggCall.getArgList.head, aggCall.filterArg) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index a8e00d3..e8707f2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._ import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._ import org.apache.flink.table.planner.functions.aggfunctions._ -import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction} +import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction} import org.apache.flink.table.planner.functions.utils.AggSqlFunction import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo @@ -75,8 +75,6 @@ class AggFunctionFactory( case _: SqlSumEmptyIsZeroAggFunction => createSum0AggFunction(argTypes) - case _: SqlIncrSumAggFunction => createIncrSumAggFunction(argTypes, index) - case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MIN => createMinAggFunction(argTypes, index) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala index cdd0191..3f79b07 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.plan.metadata -import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType} @@ -171,16 +170,6 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase { mq.getRelModifiedMonotonicity(aggWithAvg) ) - // incr_sum agg - val aggWithIncrSum = relBuilder.scan("MyTable3").aggregate( - relBuilder.groupKey(relBuilder.field("a")), - relBuilder.aggregateCall(FlinkSqlOperatorTable.INCR_SUM, false, null, - "incr_sum_b", relBuilder.field("b"))).build() - assertEquals( - new RelModifiedMonotonicity(Array(CONSTANT, INCREASING)), - mq.getRelModifiedMonotonicity(aggWithIncrSum) - ) - // test monotonicity lost because group by a agg field // select max_c, max(sum_d) as max_sum_d from ( // select a, b, max(c) as max_c, sum(d) as sum_d from MyTable4 group by a, b diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala index e6624fb..bcea370 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala @@ -579,28 +579,6 @@ class RankTest extends TableTestBase { } @Test - def testTopNOrderByIncrSum(): Unit = { - val subquery = - """ - |SELECT a, b, incr_sum(c) as sum_c - |FROM MyTable - |GROUP BY a, b - """.stripMargin - - val sql = - s""" - |SELECT * - |FROM ( - | SELECT a, b, sum_c, - | ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num - | FROM ($subquery)) - |WHERE row_num <= 10 - """.stripMargin - - util.verifyPlanWithTrait(sql) - } - - @Test def testNestedTopN(): Unit = { val subquery = """ @@ -661,58 +639,5 @@ class RankTest extends TableTestBase { util.verifyPlanWithTrait(sql) } - @Test - def testTopNWithoutRowNumber2(): Unit = { - util.addTableSource[(String, String, String, String, Long, String, Long, String)]( - "stream_source", - 'seller_id, 'sku_id, 'venture, 'stat_date, 'trd_amt, 'trd_buyer_id, 'log_pv, 'log_visitor_id) - - val group_sql = - """ - |SELECT - | seller_id - | ,sku_id - | ,venture - | ,stat_date - | ,incr_sum(trd_amt) AS amt_dtr - | ,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr - | ,SUM(log_pv) AS pv_dtr - | ,COUNT(DISTINCT log_visitor_id) AS uv_dtr - |FROM stream_source - |GROUP BY seller_id,sku_id,venture,stat_date - """.stripMargin - - val sql = - s""" - |SELECT - | CONCAT(seller_id, venture, stat_date, sku_id) as rowkey, - | seller_id, - | sku_id, - | venture, - | stat_date, - | amt_dtr, - | byr_cnt_dtr, - | pv_dtr, - | uv_dtr - |FROM ( - | SELECT - | seller_id, - | sku_id, - | venture, - | stat_date, - | amt_dtr, - | byr_cnt_dtr, - | pv_dtr, - | uv_dtr, - | ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date - | ORDER BY amt_dtr DESC) AS rownum - | FROM ($group_sql) - |) - |WHERE rownum <= 10 - """.stripMargin - - util.verifyPlanWithTrait(sql) - } - // TODO add tests about multi-sinks and udf } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala index b5c51e1..d9ccb89 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset import org.apache.flink.table.planner.utils.{CountAggFunction, IntSumAggFunction} -import org.junit.{Ignore, Test} +import org.junit.Test import scala.collection.Seq @@ -86,12 +86,4 @@ class SortDistinctAggregateITCase extends DistinctAggregateITCaseBase { ) } - @Ignore - @Test - def testApproximateCountDistinct(): Unit = { - checkResult( - "SELECT APPROX_COUNT_DISTINCT(b) FROM Table3", - Seq(row(6)) - ) - } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 75178e5..7d8796d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -398,42 +398,6 @@ class AggregateITCase( } @Test - def testIncrSum(): Unit = { - val data = new mutable.MutableList[(Int, Long, String)] - data.+=((1, 1L, "A")) - data.+=((-2, 2L, "B")) - data.+=((3, 2L, "B")) - data.+=((-4, 3L, "C")) - data.+=((5, 3L, "C")) - data.+=((6, 3L, "C")) - data.+=((-7, 4L, "B")) - data.+=((8, 4L, "A")) - data.+=((9, 4L, "D")) - data.+=((10, 4L, "E")) - data.+=((-11, 5L, "A")) - data.+=((12, 5L, "B")) - - val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c) - tEnv.registerTable("T", t) - - val sql = - """ - |SELECT b, incr_sum(a) - |FROM T - |GROUP BY b - """.stripMargin - - val t1 = tEnv.sqlQuery(sql) - val sink = new TestingRetractSink - t1.toRetractStream[Row].addSink(sink) - env.execute() - - val expected = List("1,1", "2,3", "3,11", "4,27", "5,12") - assertEquals(expected.sorted, sink.getRetractResults.sorted) - - } - - @Test def testNestedGroupByAgg(): Unit = { val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "A"))
