[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706958#comment-16706958
 ] 

ASF GitHub Bot commented on FLINK-11013:
----------------------------------------

asfgit closed pull request #7181: [FLINK-11013] [table] Fix distinct aggregates 
for group window in Table API
URL: https://github.com/apache/flink/pull/7181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 84e3f79cec3..c0cfa247bca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -633,6 +633,8 @@ case class WindowAggregate(
       case aggExpr: Aggregation
         if aggExpr.getSqlAggFunction.requiresOver =>
         failValidation(s"OVER clause is necessary for window functions: 
[${aggExpr.getClass}].")
+      case aggExpr: DistinctAgg =>
+        validateAggregateExpression(aggExpr.child)
       // check no nested aggregation exists.
       case aggExpr: Aggregation =>
         aggExpr.children.foreach { child =>
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
index 671f8dd8d1d..afa9f8b0c79 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
@@ -238,4 +239,78 @@ class AggregateTest extends TableTestBase {
 
     util.verifyTable(resultTable, expected)
   }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)](
+      "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+    val result = table
+      .window(Tumble over 15.minute on 'rowtime as 'w)
+      .groupBy('w)
+      .select('a.count.distinct, 'a.sum)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "rowtime")
+      ),
+      term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(a) AS TMP_1")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)](
+      "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+    val result = table
+      .window(Slide over 1.hour every 15.minute on 'rowtime as 'w)
+      .groupBy('w)
+      .select('a.count.distinct, 'a.sum.distinct, 'a.max.distinct)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "rowtime")
+      ),
+      term("window", SlidingGroupWindow('w, 'rowtime, 3600000.millis, 
900000.millis)),
+      term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(DISTINCT a) AS TMP_1",
+           "MAX(DISTINCT a) AS TMP_2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)](
+      "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+    val result = table
+      .window(Session withGap 15.minute on 'rowtime as 'w)
+      .groupBy('a, 'w)
+      .select('a, 'a.count, 'c.count.distinct)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a", "c", "rowtime")
+      ),
+      term("groupBy", "a"),
+      term("window", SessionGroupWindow('w, 'rowtime, 900000.millis)),
+      term("select", "a", "COUNT(a) AS TMP_0", "COUNT(DISTINCT c) AS TMP_1")
+    )
+
+    util.verifyTable(result, expected)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -----------------------------------------------------
>
>                 Key: FLINK-11013
>                 URL: https://issues.apache.org/jira/browse/FLINK-11013
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.6.2
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206)
> at 
> org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to