[
https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xiechenling closed FLINK-29380.
-------------------------------
Fix Version/s: (was: 1.17.0)
(was: 1.15.3)
Resolution: Not A Bug
It's not a bug, it's watermark idle and back pressure causing the watermark to
not be the upstream minimum.
> Two streams union, watermark error, not the minimum value
> ---------------------------------------------------------
>
> Key: FLINK-29380
> URL: https://issues.apache.org/jira/browse/FLINK-29380
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.16.0, 1.15.2
> Reporter: xiechenling
> Priority: Critical
> Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png,
> screenshot-2.png, screenshot-3.png
>
>
> Two streams union, watermark error, not the minimum value, connect operator
> watermark is true.
> !image-2022-09-21-17-59-01-846.png!
> This phenomenon feels related to watermark idle. In flink 1.13.1, watermark
> is normal whether idle watermark is set or not. In flink 1.15.2, watermark is
> normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.
> !screenshot-1.png!
> !screenshot-2.png!
> !screenshot-3.png!
> What I don't understand is, if the maximum watermark is issued by idle, the
> watermark of the union operator is incorrect, why the watermark of the
> operator before the union operator is normal?
> {code:scala}
> import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
> WatermarkStrategy}
> import org.apache.flink.api.connector.source.Source
> import org.apache.flink.api.connector.source.lib.NumberSequenceSource
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
> import java.time.format.DateTimeFormatter
> import java.time.{Duration, Instant, ZoneId}
> import java.util
> object UnionWaterMarkTest {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration)
> env.setParallelism(2)
> val numberSequenceSource: Source[Long,
> NumberSequenceSource.NumberSequenceSplit,
> util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new
> NumberSequenceSource(0L, 100000000L)
> .asInstanceOf[Source[Long,
> NumberSequenceSource.NumberSequenceSplit,
> util.Collection[NumberSequenceSource.NumberSequenceSplit]]]
> val stream1 = env.fromSource(numberSequenceSource,
> WatermarkStrategy
> .forMonotonousTimestamps[Long]()
> .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
> override def extractTimestamp(element: Long, recordTimestamp:
> Long): Long = {
> Instant.now().toEpochMilli
> }
> }),
> "source"
> )
> val idleMillis = 1L
> val stream2 = env.fromSource(numberSequenceSource,
> WatermarkStrategy
> .forMonotonousTimestamps[Long]()
> .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
> override def extractTimestamp(element: Long, recordTimestamp:
> Long): Long = {
> Instant.now().toEpochMilli - (1000L * 60L * 60L)
> }
> })
> .withIdleness(Duration.ofMillis(idleMillis))
> ,
> "source"
> )
> stream1
> .process(new PrintWatermarkProcess("stream1"))
> .returns(classOf[Long])
> .startNewChain()
> .union(
> stream2
> .process(new PrintWatermarkProcess("stream2"))
> .returns(classOf[Long])
> .startNewChain()
> .process(new PrintWatermarkProcess("stream3"))
> .returns(classOf[Long])
> .startNewChain()
> )
> .process(new PrintWatermarkProcess("union"))
> .returns(classOf[Long])
> .filter(value => false)
> .print()
> env.execute()
> }
> }
> class PrintWatermarkProcess(operatorName: String) extends
> ProcessFunction[Long, Long] {
> override def processElement(value: Long, ctx: ProcessFunction[Long,
> Long]#Context, out: Collector[Long]): Unit = {
> out.collect(value)
> val watermark = ctx.timerService().currentWatermark()
> if (watermark > 0 && watermark < 2222222222222L) {
> Instant.ofEpochMilli(watermark)
> val datetimeStr =
> DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
> // println(operatorName + " " + datetimeStr)
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)