[
https://issues.apache.org/jira/browse/FLINK-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sunjincheng closed FLINK-6074.
------------------------------
Resolution: Won't Fix
> Fix processFunction with watermark not work well in tableAPI
> ------------------------------------------------------------
>
> Key: FLINK-6074
> URL: https://issues.apache.org/jira/browse/FLINK-6074
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> I did a simple test and found that the same
> `AssignerWithPunctuatedWatermarks` and` ProcessFunction`, using sqlAPI and
> using `DataStreamAPI` get a different watermark, as follows:
> ProcessFunction:
> {code}
> class CheckWaterMark extends ProcessFunction[(Long, Int, String), (Long, Int,
> String)] {
> override def processElement(value: (Long, Int, String),
> ctx: ProcessFunction[(Long, Int, String), (Long, Int,
> String)]#Context,
> out: Collector[(Long, Int, String)]): Unit = {
> println("WaterMark=" + ctx.timerService.currentWatermark())
> }
> override def onTimer(
> timestamp: Long,
> ctx: ProcessFunction[(Long, Int, String), (Long, Int,
> String)]#OnTimerContext,
> out: Collector[(Long, Int, String)]): Unit = ???
> }
> {code}
> AssignerWithPunctuatedWatermarks:
> {code}
> class TimestampWithLatenessWatermark extends
> AssignerWithPunctuatedWatermarks[(Long,
> Int, String)] {
> override def checkAndGetNextWatermark(
> lastElement: (Long, Int, String),
> extractedTimestamp: Long)
> : Watermark = {
> new Watermark(extractedTimestamp)
> }
> override def extractTimestamp(
> element: (Long, Int, String),
> previousElementTimestamp: Long): Long = {
> element._1
> }
> }
> {code}
> TestDATA:
> {code}
> val data = List(
> (1L, 1, "Hello"),
> (2L, 2, "Hello"),
> (3L, 3, "Hello"),
> (4L, 4, "Hello"),
> (5L, 5, "Hello"),
> (6L, 6, "Hello"),
> (7L, 7, "Hello World"),
> (8L, 8, "Hello World"),
> (20L, 20, "Hello World"))
> {code}
> DataStreamAPI:
> {code}
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val src = env.fromCollection(data).assignTimestampsAndWatermarks(new
> TimestampWithLatenessWatermark())
> src.keyBy(1).process(new TriggeringFlatMapFunction())
> Print:
> WaterMark=-9223372036854775808
> WaterMark=3
> WaterMark=5
> WaterMark=7
> WaterMark=1
> WaterMark=2
> WaterMark=4
> WaterMark=8
> WaterMark=6
> {code}
> SqlAPI:
> {code}
> val src = env.fromCollection(data).assignTimestampsAndWatermarks(new
> TimestampWithLatenessWatermark())
> val tab = src.toTable(tEnv).as('a, 'b, 'c)
> tEnv.registerTable("T1", tab)
> val sqlQuery = "SELECT " +
> "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding
> AND CURRENT ROW)" +
> "from T1"
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> Print:
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=-9223372036854775808
> WaterMark=2
> WaterMark=6
> {code}
> I feel there is a problem with sql to DataStreamAPI.Welcome anyone to correct
> If there any incorrect usage?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)