This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 95275e2ea0ca1bfd35e4638d52649074190a83e0 Author: beyond1920 <[email protected]> AuthorDate: Thu Aug 1 17:38:19 2019 +0800 [FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE According to ANSI-SQL, FIRST_VALUE and LAST_VALUE are ordered set function which require the within group clause to specify an order instead of pass the order field as a parameter. This closes #9316 --- .../sql/SqlFirstLastValueAggFunction.java | 34 +++++++---- .../rules/logical/SplitAggregateRuleTest.scala | 14 ----- .../stream/sql/agg/DistinctAggregateTest.scala | 15 ----- .../runtime/stream/sql/AggregateITCase.scala | 71 ---------------------- .../runtime/stream/sql/OverWindowITCase.scala | 37 +++++------ .../runtime/stream/sql/SplitAggregateITCase.scala | 22 ------- 6 files changed, 43 insertions(+), 150 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java index e4b8a11..305f3e1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.functions.sql; +import org.apache.flink.util.Preconditions; + import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; import org.apache.calcite.rel.type.RelDataType; @@ -27,8 +29,8 @@ import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Optionality; import java.util.List; @@ -36,30 +38,40 @@ import java.util.List; * <code>FIRST_VALUE</code> and <code>LAST_VALUE</code> aggregate functions * return the first or the last value in a list of values that are input to the * function. + * + * <p>NOTE: The difference between this and {@link org.apache.calcite.sql.fun.SqlFirstLastValueAggFunction} + * is that this can be used without over clause. */ public class SqlFirstLastValueAggFunction extends SqlAggFunction { - public SqlFirstLastValueAggFunction(SqlKind sqlKind) { - super(sqlKind.name(), + public SqlFirstLastValueAggFunction(SqlKind kind) { + super( + kind.name(), null, - sqlKind, + kind, ReturnTypes.ARG0_NULLABLE_IF_EMPTY, null, - OperandTypes.or(OperandTypes.ANY, OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY)), + OperandTypes.ANY, SqlFunctionCategory.NUMERIC, false, - false); + false, + Optionality.FORBIDDEN); + Preconditions.checkArgument(kind == SqlKind.FIRST_VALUE + || kind == SqlKind.LAST_VALUE); } - @Override + //~ Methods ---------------------------------------------------------------- + + @SuppressWarnings("deprecation") public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) { return ImmutableList.of( - typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true), - typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true)); + typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.ANY), true)); } - @Override + @SuppressWarnings("deprecation") public RelDataType getReturnType(RelDataTypeFactory typeFactory) { - return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true); + return typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.ANY), true); } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala index 5d58dfb..2680482 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala @@ -75,25 +75,11 @@ class SplitAggregateRuleTest extends TableTestBase { } @Test - def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = { - // FIRST_VALUE with order is not splittable, - // so SplitAggregateRule can not be applied to the plan - util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test def testSingleLastValueWithDistinctAgg(): Unit = { util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") } @Test - def testSingleLastValueWithOrderWithDistinctAgg(): Unit = { - // LAST_VALUE with order is not splittable, - // so SplitAggregateRule can not be applied to the plan - util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test def testSingleConcatAggWithDistinctAgg(): Unit = { util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala index 8e36305..5cbbab4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala @@ -81,27 +81,12 @@ class DistinctAggregateTest( } @Test - def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = { - // FIRST_VALUE is not mergeable, so the final plan does not contain local agg - // FIRST_VALUE with order is not splittable, - // so SplitAggregateRule can not be applied to the plan - util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test def testSingleLastValueWithDistinctAgg(): Unit = { // LAST_VALUE is not mergeable, so the final plan does not contain local agg util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") } @Test - def testSingleLastValueWithOrderWithDistinctAgg(): Unit = { - // LAST_VALUE is not mergeable, so the final plan does not contain local agg - // LAST_VALUE with order is not splittable, so SplitAggregateRule can not be applied to the plan - util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test def testSingleConcatAggWithDistinctAgg(): Unit = { util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a") } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 41561a3..75178e5 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -17,7 +17,6 @@ */ package org.apache.flink.table.planner.runtime.stream.sql -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic @@ -472,76 +471,6 @@ class AggregateITCase( assertEquals(expected.sorted, sink.getRetractResults.sorted) } - @Test - def testFirstLastWithOrder(): Unit = { - // set all operator parallelism to 1 to make sure the processed input element is in order - env.setParallelism(1) - val data = new mutable.MutableList[(Long, String, String, Int, Long, String)] - data.+=((2L, "u1", "i1", 0, 0L, "b1")) - data.+=((-1L, "u1", "i1", 1, 1L, "b1")) - data.+=((3L, "u2", "i1", 1, 1L, "b1")) - data.+=((4L, "u2", null, 0, 0L, "b1")) - - val t = failingDataSource(data).toTable(tEnv, 'o, 'u, 'i, 'v, 's, 'b) - tEnv.registerTable("T", t) - val t1 = tEnv.sqlQuery( - """ - |SELECT first_value(u, lo) as f, last_value(u, lo) as l - |FROM ( - | SELECT b, u, i, last_value(o) as lo, last_value(v, o) as lv, - | first_value(o) as fo, first_value(v, o) as fv - | FROM T - | GROUP BY u, i, b) - |GROUP BY i - """.stripMargin) - - val sink = new TestingRetractSink - t1.toRetractStream[Row].addSink(sink) - env.execute() - - val expected = List( - "u1,u2", - "u2,u2") - assertEquals(expected.sorted, sink.getRetractResults.sorted) - } - - @Test - def testFirstValueWithInputContainingNull(): Unit = { - val data = List( - Row.of("blond", null, Long.box(23L)), - Row.of("slim", null, Long.box(21L)), - Row.of("slim", null, Long.box(17L)), - Row.of("blond", null, Long.box(19L)) - ) - - implicit val tpe: TypeInformation[Row] = new RowTypeInfo( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO) // tpe is automatically - - val t = failingDataSource(data).toTable(tEnv, 't, 'name, 'age) - tEnv.registerTable("T", t) - - /* use sql grammar to generate null input for firstValue, - * since fromCollection will throw exception when serializing null as Long - */ - val t1 = tEnv.sqlQuery( - """ - |SELECT t, - |first_value(name, age) as c, - |last_value(name, age) as d - |FROM T - |GROUP BY t - """.stripMargin) - - val sink = new TestingRetractSink - t1.toRetractStream[Row].addSink(sink) - env.execute() - - val expected = List("slim,null,null", "blond,null,null") - assertEquals(expected.sorted, sink.getRetractResults.sorted) - } - /** test unbounded groupBy (without window) **/ @Test def testUnboundedGroupBy(): Unit = { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala index fddb45f..0c11794 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala @@ -339,10 +339,6 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas " c, b, " + " LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " + " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " + - " first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " + - " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " + - " last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " + - " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " + " COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " + " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " + " SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " + @@ -354,19 +350,26 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas env.execute() val expected = List( - "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3", - "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9", - "Hello,3,0,2,3,4,9", - "Hello,4,0,3,4,2,7", - "Hello,5,1,4,5,2,9", - "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12", - "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18", - "Hello World,17,3,7,7,3,21", - "Hello World,7,1,7,7,1,7", - "Hello World,77,3,7,7,3,21", - "Hello World,18,1,7,7,1,7", - "Hello World,8,2,7,8,2,15", - "Hello World,20,1,20,20,1,20") + "Hello,1,0,1,1", + "Hello,15,0,2,2", + "Hello,16,0,3,3", + "Hello,2,0,6,9", + "Hello,3,0,6,9", + "Hello,2,0,6,9", + "Hello,3,0,4,9", + "Hello,4,0,2,7", + "Hello,5,1,2,9", + "Hello,6,2,2,11", + "Hello,65,2,2,12", + "Hello,9,2,2,12", + "Hello,9,2,2,12", + "Hello,18,3,3,18", + "Hello World,17,3,3,21", + "Hello World,7,1,1,7", + "Hello World,77,3,3,21", + "Hello World,18,1,1,7", + "Hello World,8,2,2,15", + "Hello World,20,1,1,20") assertEquals(expected.sorted, sink.getAppendResults.sorted) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala index 66bbfcc..6f7df33 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala @@ -195,28 +195,6 @@ class SplitAggregateITCase( assertEquals(expected.sorted, sink.getRetractResults.sorted) } - @Test - def testFirstValueLastValueWithRetraction(): Unit = { - val t1 = tEnv.sqlQuery( - s""" - |SELECT - | b, FIRST_VALUE(c, a), LAST_VALUE(c, a), COUNT(DISTINCT c) - |FROM( - | SELECT - | a, COUNT(DISTINCT b) as b, MAX(b) as c - | FROM T - | GROUP BY a - |) GROUP BY b - """.stripMargin) - - val sink = new TestingRetractSink - t1.toRetractStream[Row].addSink(sink) - env.execute() - - val expected = List("2,2,6,2", "4,5,5,1", "1,5,5,1") - assertEquals(expected.sorted, sink.getRetractResults.sorted) - } - @Ignore("[FLINK-12088]: JOIN is not supported") @Test def testAggWithJoin(): Unit = {
