[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885559#comment-15885559
 ] 

ASF GitHub Bot commented on FLINK-3475:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3111#discussion_r103170129
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
 ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.table.utils.TableTestBase
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.junit.Test
    +
    +class DistinctAggregateTest extends TableTestBase {
    +
    +  @Test
    +  def testSingleDistinctAggregate(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnSameColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), 
MAX(DISTINCT a) FROM MyTable"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a")
    +          ),
    +          tuples(List(null)),
    +          term("values", "a")
    +        ),
    +        term("union", "a")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS 
EXPR$2")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit 
= {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
    +    val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable"
    +
    +    val expected0 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "a"),
    +            term("select", "a", "SUM(b) AS EXPR$1")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "a", "EXPR$1")
    +        ),
    +        term("union", "a", "EXPR$1")
    +      ),
    +      term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery0, expected0)
    +
    +    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
    +    val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected1 = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetUnion",
    +        unaryNode(
    +          "DataSetValues",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetCalc",
    +              batchTableNode(0),
    +              term("select", "a", "b")
    +            ),
    +            term("groupBy", "b"),
    +            term("select", "b", "COUNT(a) AS EXPR$0")
    +          ),
    +          tuples(List(null, null)),
    +          term("values", "b", "EXPR$0")
    +        ),
    +        term("union", "b", "EXPR$0")
    +      ),
    +      term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1")
    +    )
    +
    +    util.verifySql(sqlQuery1, expected1)
    +  }
    +
    +  @Test
    +  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
    +
    +    val expected = binaryNode(
    +      "DataSetSingleRowJoin",
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetUnion",
    +          unaryNode(
    +            "DataSetValues",
    +            unaryNode(
    +              "DataSetAggregate",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(0),
    +                term("select", "a")
    +              ),
    +              term("groupBy", "a"),
    +              term("select", "a")
    +            ),
    +            tuples(List(null)),
    +            term("values", "a")
    +          ),
    +          term("union", "a")
    +        ),
    +        term("select", "COUNT(a) AS EXPR$0")
    +      ),
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetUnion",
    +          unaryNode(
    +            "DataSetValues",
    +            unaryNode(
    +              "DataSetAggregate",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(0),
    +                term("select", "b")
    +              ),
    +              term("groupBy", "b"),
    +              term("select", "b")
    +            ),
    +            tuples(List(null)),
    +            term("values", "b")
    +          ),
    +          term("union", "b")
    +        ),
    +        term("select", "SUM(b) AS EXPR$1")
    +      ),
    +      term("where", "true"),
    +      term("join", "EXPR$0", "EXPR$1"),
    +      term("joinType", "NestedLoopJoin")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateWithGrouping(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP 
BY a"
    +
    +    val expected = unaryNode(
    +      "DataSetAggregate",
    +      unaryNode(
    +        "DataSetAggregate",
    +        unaryNode(
    +          "DataSetCalc",
    +          batchTableNode(0),
    +          term("select", "a", "b")
    +        ),
    +        term("groupBy", "a", "b"),
    +        term("select", "a", "b", "COUNT(a) AS EXPR$1")
    +      ),
    +      term("groupBy", "a"),
    +      term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
    +    )
    +
    +    util.verifySql(sqlQuery, expected)
    +  }
    +
    +  @Test
    +  def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    +
    +    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP 
BY a"
    --- End diff --
    
    Add two tests for `GROUP BY` with two distinct aggregates
    1. on same column
    2. on different column


> DISTINCT aggregate function support for SQL queries
> ---------------------------------------------------
>
>                 Key: FLINK-3475
>                 URL: https://issues.apache.org/jira/browse/FLINK-3475
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Chengxiang Li
>            Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to