[ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:
----------------------------
    Summary: wrong result if table toDataStream then keyey by sum in Batch Mode 
 (was: wrong result if table toDataStream then keyey by sum)

> wrong result if table toDataStream then keyey by sum in Batch Mode
> ------------------------------------------------------------------
>
>                 Key: FLINK-25471
>                 URL: https://issues.apache.org/jira/browse/FLINK-25471
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.2
>            Reporter: zhangzh
>            Priority: Critical
>
> I have 6 lines like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums "wrod"
>  
>  
>  
>  
>  
>  
>  
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
> import org.apache.flink.api.scala.typeutils.Types
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.types.Row
> object TableToDataStreamBatchWordCount {
> def main(args: Array[String]) {
> //create env and tableEnv
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
> //env.setRuntimeMode(RuntimeExecutionMode.BATCH)
> env.setParallelism(7)
> val tableEnv = StreamTableEnvironment.create(env)
> // make data ,3 line
> val resultDS2 = env.fromElements(
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> )(Types.ROW(Types.STRING))
> // dataStream[Row] --> Table --> sql to upper transform table
> val table = tableEnv.fromDataStream(resultDS2).as("word")
> tableEnv.createTemporaryView(s"tmp_table",table)
> val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from 
> tmp_table ")
> // sql transformed table --> DataStream[String]
> val resultDs = tableEnv.toDataStream(resultTable).map(row => {
> row.getField("word").asInstanceOf[String]
> })
> // keyby reduce
> val counts: DataStream[(String, Int)] = resultDs
> .map((_, 1))
> .keyBy(_._1)
> .sum(1)
> // print result
> counts.print()
> env.execute("WordCount")
> }
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to