[
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhangzh updated FLINK-25471:
----------------------------
Summary: wrong result if table toDataStream then keyey by sum in Batch Mode
(was: wrong result if table toDataStream then keyey by sum)
> wrong result if table toDataStream then keyey by sum in Batch Mode
> ------------------------------------------------------------------
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.14.2
> Reporter: zhangzh
> Priority: Critical
>
> I have 6 lines like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then make it to table with one colums "wrod"
>
>
>
>
>
>
>
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
> import org.apache.flink.api.scala.typeutils.Types
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.types.Row
> object TableToDataStreamBatchWordCount {
> def main(args: Array[String]) {
> //create env and tableEnv
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
> //env.setRuntimeMode(RuntimeExecutionMode.BATCH)
> env.setParallelism(7)
> val tableEnv = StreamTableEnvironment.create(env)
> // make data ,3 line
> val resultDS2 = env.fromElements(
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> )(Types.ROW(Types.STRING))
> // dataStream[Row] --> Table --> sql to upper transform table
> val table = tableEnv.fromDataStream(resultDS2).as("word")
> tableEnv.createTemporaryView(s"tmp_table",table)
> val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from
> tmp_table ")
> // sql transformed table --> DataStream[String]
> val resultDs = tableEnv.toDataStream(resultTable).map(row => {
> row.getField("word").asInstanceOf[String]
> })
> // keyby reduce
> val counts: DataStream[(String, Int)] = resultDs
> .map((_, 1))
> .keyBy(_._1)
> .sum(1)
> // print result
> counts.print()
> env.execute("WordCount")
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.1#820001)