This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95275e2ea0ca1bfd35e4638d52649074190a83e0
Author: beyond1920 <[email protected]>
AuthorDate: Thu Aug 1 17:38:19 2019 +0800

    [FLINK-13529][table-planner-blink] Remove the second parameter of 
FIRST_VALUE and LAST_VALUE
    
    According to ANSI-SQL, FIRST_VALUE and LAST_VALUE are ordered set function 
which require the within group clause to specify an order instead of pass the 
order field as a parameter.
    
    This closes #9316
---
 .../sql/SqlFirstLastValueAggFunction.java          | 34 +++++++----
 .../rules/logical/SplitAggregateRuleTest.scala     | 14 -----
 .../stream/sql/agg/DistinctAggregateTest.scala     | 15 -----
 .../runtime/stream/sql/AggregateITCase.scala       | 71 ----------------------
 .../runtime/stream/sql/OverWindowITCase.scala      | 37 +++++------
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 22 -------
 6 files changed, 43 insertions(+), 150 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
index e4b8a11..305f3e1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.functions.sql;
 
+import org.apache.flink.util.Preconditions;
+
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -27,8 +29,8 @@ import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Optionality;
 
 import java.util.List;
 
@@ -36,30 +38,40 @@ import java.util.List;
  * <code>FIRST_VALUE</code> and <code>LAST_VALUE</code> aggregate functions
  * return the first or the last value in a list of values that are input to the
  * function.
+ *
+ * <p>NOTE: The difference between this and {@link 
org.apache.calcite.sql.fun.SqlFirstLastValueAggFunction}
+ * is that this can be used without over clause.
  */
 public class SqlFirstLastValueAggFunction extends SqlAggFunction {
 
-       public SqlFirstLastValueAggFunction(SqlKind sqlKind) {
-               super(sqlKind.name(),
+       public SqlFirstLastValueAggFunction(SqlKind kind) {
+               super(
+                               kind.name(),
                                null,
-                               sqlKind,
+                               kind,
                                ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
                                null,
-                               OperandTypes.or(OperandTypes.ANY, 
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY)),
+                               OperandTypes.ANY,
                                SqlFunctionCategory.NUMERIC,
                                false,
-                               false);
+                               false,
+                               Optionality.FORBIDDEN);
+               Preconditions.checkArgument(kind == SqlKind.FIRST_VALUE
+                               || kind == SqlKind.LAST_VALUE);
        }
 
-       @Override
+       //~ Methods 
----------------------------------------------------------------
+
+       @SuppressWarnings("deprecation")
        public List<RelDataType> getParameterTypes(RelDataTypeFactory 
typeFactory) {
                return ImmutableList.of(
-                               
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY),
 true),
-                               
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY),
 true));
+                               typeFactory.createTypeWithNullability(
+                                               
typeFactory.createSqlType(SqlTypeName.ANY), true));
        }
 
-       @Override
+       @SuppressWarnings("deprecation")
        public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-               return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY),
 true);
+               return typeFactory.createTypeWithNullability(
+                               typeFactory.createSqlType(SqlTypeName.ANY), 
true);
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index 5d58dfb..2680482 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -75,25 +75,11 @@ class SplitAggregateRuleTest extends TableTestBase {
   }
 
   @Test
-  def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
-    // FIRST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM 
MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleLastValueWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
   }
 
   @Test
-  def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
-    // LAST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM 
MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 8e36305..5cbbab4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -81,27 +81,12 @@ class DistinctAggregateTest(
   }
 
   @Test
-  def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
-    // FIRST_VALUE is not mergeable, so the final plan does not contain local 
agg
-    // FIRST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM 
MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleLastValueWithDistinctAgg(): Unit = {
     // LAST_VALUE is not mergeable, so the final plan does not contain local 
agg
     util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
   }
 
   @Test
-  def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
-    // LAST_VALUE is not mergeable, so the final plan does not contain local 
agg
-    // LAST_VALUE with order is not splittable, so SplitAggregateRule can not 
be applied to the plan
-    util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM 
MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable 
GROUP BY a")
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 41561a3..75178e5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -472,76 +471,6 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Test
-  def testFirstLastWithOrder(): Unit = {
-    // set all operator parallelism to 1 to make sure the processed input 
element is in order
-    env.setParallelism(1)
-    val data = new mutable.MutableList[(Long, String, String, Int, Long, 
String)]
-    data.+=((2L, "u1", "i1", 0, 0L, "b1"))
-    data.+=((-1L, "u1", "i1", 1, 1L, "b1"))
-    data.+=((3L, "u2", "i1", 1, 1L, "b1"))
-    data.+=((4L, "u2", null, 0, 0L, "b1"))
-
-    val t = failingDataSource(data).toTable(tEnv, 'o, 'u, 'i, 'v, 's, 'b)
-    tEnv.registerTable("T", t)
-    val t1 = tEnv.sqlQuery(
-      """
-        |SELECT first_value(u, lo) as f, last_value(u, lo) as l
-        |FROM (
-        |  SELECT b, u, i, last_value(o) as lo, last_value(v, o) as lv,
-        |    first_value(o) as fo, first_value(v, o) as fv
-        |  FROM T
-        |  GROUP BY u, i, b)
-        |GROUP BY i
-      """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List(
-      "u1,u2",
-      "u2,u2")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
-  def testFirstValueWithInputContainingNull(): Unit = {
-    val data = List(
-      Row.of("blond", null, Long.box(23L)),
-      Row.of("slim", null, Long.box(21L)),
-      Row.of("slim", null, Long.box(17L)),
-      Row.of("blond", null, Long.box(19L))
-    )
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO) // tpe is automatically
-
-    val t = failingDataSource(data).toTable(tEnv, 't, 'name, 'age)
-    tEnv.registerTable("T", t)
-
-    /* use sql grammar to generate null input for firstValue,
-     * since fromCollection will throw exception when serializing null as Long
-     */
-    val t1 = tEnv.sqlQuery(
-      """
-        |SELECT t,
-        |first_value(name, age) as c,
-        |last_value(name, age) as d
-        |FROM T
-        |GROUP BY t
-      """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("slim,null,null", "blond,null,null")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
   /** test unbounded groupBy (without window) **/
   @Test
   def testUnboundedGroupBy(): Unit = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index fddb45f..0c11794 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -339,10 +339,6 @@ class OverWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBas
       "  c, b, " +
       "  LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime 
RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
@@ -354,19 +350,26 @@ class OverWindowITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBas
     env.execute()
 
     val expected = List(
-      "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3",
-      "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9",
-      "Hello,3,0,2,3,4,9",
-      "Hello,4,0,3,4,2,7",
-      "Hello,5,1,4,5,2,9",
-      "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12",
-      "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18",
-      "Hello World,17,3,7,7,3,21",
-      "Hello World,7,1,7,7,1,7",
-      "Hello World,77,3,7,7,3,21",
-      "Hello World,18,1,7,7,1,7",
-      "Hello World,8,2,7,8,2,15",
-      "Hello World,20,1,20,20,1,20")
+      "Hello,1,0,1,1",
+      "Hello,15,0,2,2",
+      "Hello,16,0,3,3",
+      "Hello,2,0,6,9",
+      "Hello,3,0,6,9",
+      "Hello,2,0,6,9",
+      "Hello,3,0,4,9",
+      "Hello,4,0,2,7",
+      "Hello,5,1,2,9",
+      "Hello,6,2,2,11",
+      "Hello,65,2,2,12",
+      "Hello,9,2,2,12",
+      "Hello,9,2,2,12",
+      "Hello,18,3,3,18",
+      "Hello World,17,3,3,21",
+      "Hello World,7,1,1,7",
+      "Hello World,77,3,3,21",
+      "Hello World,18,1,1,7",
+      "Hello World,8,2,2,15",
+      "Hello World,20,1,1,20")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 66bbfcc..6f7df33 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -195,28 +195,6 @@ class SplitAggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Test
-  def testFirstValueLastValueWithRetraction(): Unit = {
-    val t1 = tEnv.sqlQuery(
-      s"""
-         |SELECT
-         |  b, FIRST_VALUE(c, a), LAST_VALUE(c, a), COUNT(DISTINCT c)
-         |FROM(
-         |  SELECT
-         |    a, COUNT(DISTINCT b) as b, MAX(b) as c
-         |  FROM T
-         |  GROUP BY a
-         |) GROUP BY b
-       """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("2,2,6,2", "4,5,5,1", "1,5,5,1")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
   @Ignore("[FLINK-12088]: JOIN is not supported")
   @Test
   def testAggWithJoin(): Unit = {

Reply via email to