[ 
https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiechenling closed FLINK-29380.
-------------------------------
    Fix Version/s:     (was: 1.17.0)
                       (was: 1.15.3)
       Resolution: Not A Bug

It's not a bug, it's watermark idle and back pressure causing the watermark to 
not be the upstream minimum.

> Two streams union, watermark error, not the minimum value
> ---------------------------------------------------------
>
>                 Key: FLINK-29380
>                 URL: https://issues.apache.org/jira/browse/FLINK-29380
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.0, 1.15.2
>            Reporter: xiechenling
>            Priority: Critical
>         Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png, 
> screenshot-2.png, screenshot-3.png
>
>
> Two streams union, watermark error, not the minimum value, connect operator  
> watermark is true.
> !image-2022-09-21-17-59-01-846.png!
> This phenomenon feels related to watermark idle. In flink 1.13.1, watermark 
> is normal whether idle watermark is set or not. In flink 1.15.2, watermark is 
> normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.
>  !screenshot-1.png! 
>  !screenshot-2.png! 
>  !screenshot-3.png! 
> What I don't understand is, if the maximum watermark is issued by idle, the 
> watermark of the union operator is incorrect, why the watermark of the 
> operator before the union operator is normal?
> {code:scala}
> import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
> WatermarkStrategy}
> import org.apache.flink.api.connector.source.Source
> import org.apache.flink.api.connector.source.lib.NumberSequenceSource
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
> import java.time.format.DateTimeFormatter
> import java.time.{Duration, Instant, ZoneId}
> import java.util
> object UnionWaterMarkTest {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new 
> Configuration)
>     env.setParallelism(2)
>     val numberSequenceSource: Source[Long,
>       NumberSequenceSource.NumberSequenceSplit,
>       util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new 
> NumberSequenceSource(0L, 100000000L)
>       .asInstanceOf[Source[Long,
>       NumberSequenceSource.NumberSequenceSplit,
>       util.Collection[NumberSequenceSource.NumberSequenceSplit]]]
>     val stream1 = env.fromSource(numberSequenceSource,
>       WatermarkStrategy
>         .forMonotonousTimestamps[Long]()
>         .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
>           override def extractTimestamp(element: Long, recordTimestamp: 
> Long): Long = {
>             Instant.now().toEpochMilli
>           }
>         }),
>       "source"
>     )
>     val idleMillis = 1L
>     val stream2 = env.fromSource(numberSequenceSource,
>       WatermarkStrategy
>         .forMonotonousTimestamps[Long]()
>         .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
>           override def extractTimestamp(element: Long, recordTimestamp: 
> Long): Long = {
>             Instant.now().toEpochMilli - (1000L * 60L * 60L)
>           }
>         })
>         .withIdleness(Duration.ofMillis(idleMillis))
>       ,
>       "source"
>     )
>     stream1
>       .process(new PrintWatermarkProcess("stream1"))
>       .returns(classOf[Long])
>       .startNewChain()
>       .union(
>         stream2
>           .process(new PrintWatermarkProcess("stream2"))
>           .returns(classOf[Long])
>           .startNewChain()
>           .process(new PrintWatermarkProcess("stream3"))
>           .returns(classOf[Long])
>           .startNewChain()
>       )
>       .process(new PrintWatermarkProcess("union"))
>       .returns(classOf[Long])
>       .filter(value => false)
>       .print()
>     env.execute()
>   }
> }
> class PrintWatermarkProcess(operatorName: String) extends 
> ProcessFunction[Long, Long] {
>   override def processElement(value: Long, ctx: ProcessFunction[Long, 
> Long]#Context, out: Collector[Long]): Unit = {
>     out.collect(value)
>     val watermark = ctx.timerService().currentWatermark()
>     if (watermark > 0 && watermark < 2222222222222L) {
>       Instant.ofEpochMilli(watermark)
>       val datetimeStr = 
> DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
> //      println(operatorName + "  " + datetimeStr)
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to