[ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:
----------------------------
    Description: 
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

 

 

 

  was:
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums "wrod"

 

 

 

 

 

 

 




import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object TableToDataStreamBatchWordCount {

def main(args: Array[String]) {


//create env and tableEnv
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(7)
val tableEnv = StreamTableEnvironment.create(env)

// make data ,3 line
val resultDS2 = env.fromElements(
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
)(Types.ROW(Types.STRING))

// dataStream[Row] --> Table --> sql to upper transform table
val table = tableEnv.fromDataStream(resultDS2).as("word")
tableEnv.createTemporaryView(s"tmp_table",table)
val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from 
tmp_table ")

// sql transformed table --> DataStream[String]
val resultDs = tableEnv.toDataStream(resultTable).map(row => {
row.getField("word").asInstanceOf[String]
})


// keyby reduce
val counts: DataStream[(String, Int)] = resultDs
.map((_, 1))
.keyBy(_._1)
.sum(1)

// print result
counts.print()

env.execute("WordCount")
}
}


> wrong result if table toDataStream then keyey by sum in Batch Mode
> ------------------------------------------------------------------
>
>                 Key: FLINK-25471
>                 URL: https://issues.apache.org/jira/browse/FLINK-25471
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.2
>            Reporter: zhangzh
>            Priority: Critical
>
> I have 6 lines like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to