[ 
https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708905#comment-16708905
 ] 

ASF GitHub Bot commented on FLINK-10845:
----------------------------------------

xueyumusic commented on a change in pull request #7079: [FLINK-10845][table] 
Support multiple different DISTINCT aggregates for batch
URL: https://github.com/apache/flink/pull/7079#discussion_r238720519
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ##########
 @@ -191,7 +191,17 @@ object ScalarOperators {
           )
         )
     }
-  }  
+  }
+
+  def generateDistinctFrom(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val newleft = left.copy(nullTerm = GeneratedExpression.NEVER_NULL)
 
 Review comment:
   @twalthr I thought the similar way ago and met a problem. If we check left 
is null then need to generate left code first. However in generateEqual 
(generateOperatorIfNotNull) it also generate left code, such as 
   ```
   ${left.code}
   ${right.code}
   if (${left.nullTerm}) {
     return ${right.nullTerm}
   } else if (${right.nullTerm}) {
     return ${left.nullTerm}
   } else {
     return generateEqual(left, right)
    (here will also generate ${left.code} and ${right.code} in 
generateOperatorIfNotNull)
   }
   ```
   I don't know how to make left and right code generate once if don't modify 
generateOperatorIfNotNull..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support DISTINCT aggregates for batch
> -------------------------------------
>
>                 Key: FLINK-10845
>                 URL: https://issues.apache.org/jira/browse/FLINK-10845
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we support distinct aggregates for streaming. However, executing 
> the same query on batch like the following test:
> {code}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val sqlQuery =
>       "SELECT b, " +
>       "  SUM(DISTINCT (a / 3)), " +
>       "  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
>       "  COUNT(DISTINCT c) " +
>       "FROM MyTable " +
>       "GROUP BY b"
>     val data = new mutable.MutableList[(Int, Long, String)]
>     data.+=((1, 1L, "Hi"))
>     data.+=((2, 2L, "Hello"))
>     data.+=((3, 2L, "Hello world"))
>     data.+=((4, 3L, "Hello world, how are you?"))
>     data.+=((5, 3L, "I am fine."))
>     data.+=((6, 3L, "Luke Skywalker"))
>     data.+=((7, 4L, "Comment#1"))
>     data.+=((8, 4L, "Comment#2"))
>     data.+=((9, 4L, "Comment#3"))
>     data.+=((10, 4L, "Comment#4"))
>     data.+=((11, 5L, "Comment#5"))
>     data.+=((12, 5L, "Comment#6"))
>     data.+=((13, 5L, "Comment#7"))
>     data.+=((14, 5L, "Comment#8"))
>     data.+=((15, 5L, "Comment#9"))
>     data.+=((16, 6L, "Comment#10"))
>     data.+=((17, 6L, "Comment#11"))
>     data.+=((18, 6L, "Comment#12"))
>     data.+=((19, 6L, "Comment#13"))
>     data.+=((20, 6L, "Comment#14"))
>     data.+=((21, 6L, "Comment#15"))
>     val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("MyTable", t)
>     tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
> {code}
> Fails with:
> {code}
> org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT 
> DISTINCT FROM 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>       at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
>       at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
>       at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>       at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
>       at 
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
>       at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
>       at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
>       at 
> org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
>       at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
>       at 
> org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
>       at 
> org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
>       at 
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
>       at 
> org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to