[ 
https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiechenling updated FLINK-29380:
--------------------------------
    Description: 
Two streams union, watermark error, not the minimum value, connect operator  
watermark is true.
!image-2022-09-21-17-59-01-846.png!

This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is 
normal whether idle watermark is set or not. In flink 1.15.2, watermark is 
normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.

 !screenshot-1.png! 

 !screenshot-2.png! 




{code:scala}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
import org.apache.flink.api.connector.source.Source
import org.apache.flink.api.connector.source.lib.NumberSequenceSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

import java.time.format.DateTimeFormatter
import java.time.{Duration, Instant, ZoneId}
import java.util

object UnionWaterMarkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new 
Configuration)
    env.setParallelism(2)

    val numberSequenceSource: Source[Long,
      NumberSequenceSource.NumberSequenceSplit,
      util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new 
NumberSequenceSource(0L, 100000000L)
      .asInstanceOf[Source[Long,
      NumberSequenceSource.NumberSequenceSplit,
      util.Collection[NumberSequenceSource.NumberSequenceSplit]]]

    val stream1 = env.fromSource(numberSequenceSource,
      WatermarkStrategy
        .forMonotonousTimestamps[Long]()
        .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
          override def extractTimestamp(element: Long, recordTimestamp: Long): 
Long = {
            Instant.now().toEpochMilli
          }
        }),
      "source"
    )

    val idleMillis = 1L
    val stream2 = env.fromSource(numberSequenceSource,
      WatermarkStrategy
        .forMonotonousTimestamps[Long]()
        .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
          override def extractTimestamp(element: Long, recordTimestamp: Long): 
Long = {
            Instant.now().toEpochMilli - (1000L * 60L * 60L)
          }
        })
        .withIdleness(Duration.ofMillis(idleMillis))
      ,
      "source"
    )

    stream1
      .process(new PrintWatermarkProcess("stream1"))
      .returns(classOf[Long])
      .startNewChain()
      .union(
        stream2
          .process(new PrintWatermarkProcess("stream2"))
          .returns(classOf[Long])
          .startNewChain()
          .process(new PrintWatermarkProcess("stream3"))
          .returns(classOf[Long])
          .startNewChain()
      )
      .process(new PrintWatermarkProcess("union"))
      .returns(classOf[Long])
      .filter(value => false)
      .print()

    env.execute()

  }

}

class PrintWatermarkProcess(operatorName: String) extends ProcessFunction[Long, 
Long] {
  override def processElement(value: Long, ctx: ProcessFunction[Long, 
Long]#Context, out: Collector[Long]): Unit = {
    out.collect(value)
    val watermark = ctx.timerService().currentWatermark()
    if (watermark > 0 && watermark < 2222222222222L) {
      Instant.ofEpochMilli(watermark)
      val datetimeStr = 
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
//      println(operatorName + "  " + datetimeStr)
    }
  }
}

{code}

  was:
Two streams union, watermark error, not the minimum value, connect operator  
watermark is true.
!image-2022-09-21-17-59-01-846.png!

This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is 
normal whether idle watermark is set or not. In flink 1.15.2, watermark is 
normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.

 !screenshot-1.png! 

 !screenshot-2.png! 


{code:scala}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
import org.apache.flink.api.connector.source.Source
import org.apache.flink.api.connector.source.lib.NumberSequenceSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

import java.time.format.DateTimeFormatter
import java.time.{Duration, Instant, ZoneId}
import java.util

object UnionWaterMarkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new 
Configuration)
    env.setParallelism(2)

    val numberSequenceSource: Source[Long,
      NumberSequenceSource.NumberSequenceSplit,
      util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new 
NumberSequenceSource(0L, 100000000L)
      .asInstanceOf[Source[Long,
      NumberSequenceSource.NumberSequenceSplit,
      util.Collection[NumberSequenceSource.NumberSequenceSplit]]]

    val stream1 = env.fromSource(numberSequenceSource,
      WatermarkStrategy
        .forMonotonousTimestamps[Long]()
        .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
          override def extractTimestamp(element: Long, recordTimestamp: Long): 
Long = {
            Instant.now().toEpochMilli
          }
        }),
      "source"
    )

    val idleMillis = 1L
    val stream2 = env.fromSource(numberSequenceSource,
      WatermarkStrategy
        .forMonotonousTimestamps[Long]()
        .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
          override def extractTimestamp(element: Long, recordTimestamp: Long): 
Long = {
            Instant.now().toEpochMilli - (1000L * 60L * 60L)
          }
        })
        .withIdleness(Duration.ofMillis(idleMillis))
      ,
      "source"
    )

    stream1
      .process(new PrintWatermarkProcess("stream1"))
      .returns(classOf[Long])
      .startNewChain()
      .union(
        stream2
          .process(new PrintWatermarkProcess("stream2"))
          .returns(classOf[Long])
          .startNewChain()
          .process(new PrintWatermarkProcess("stream3"))
          .returns(classOf[Long])
          .startNewChain()
      )
      .process(new PrintWatermarkProcess("union"))
      .returns(classOf[Long])
      .filter(value => false)
      .print()

    env.execute()

  }

}

class PrintWatermarkProcess(operatorName: String) extends ProcessFunction[Long, 
Long] {
  override def processElement(value: Long, ctx: ProcessFunction[Long, 
Long]#Context, out: Collector[Long]): Unit = {
    out.collect(value)
    val watermark = ctx.timerService().currentWatermark()
    if (watermark > 0 && watermark < 2222222222222L) {
      Instant.ofEpochMilli(watermark)
      val datetimeStr = 
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
//      println(operatorName + "  " + datetimeStr)
    }
  }
}

{code}


> Two streams union, watermark error, not the minimum value
> ---------------------------------------------------------
>
>                 Key: FLINK-29380
>                 URL: https://issues.apache.org/jira/browse/FLINK-29380
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.0, 1.15.2
>            Reporter: xiechenling
>            Priority: Blocker
>             Fix For: 1.16.0, 1.15.3
>
>         Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png, 
> screenshot-2.png
>
>
> Two streams union, watermark error, not the minimum value, connect operator  
> watermark is true.
> !image-2022-09-21-17-59-01-846.png!
> This phenomenon feels related to watermark idle. In flink 1.13.1, watermark 
> is normal whether idle watermark is set or not. In flink 1.15.2, watermark is 
> normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.
>  !screenshot-1.png! 
>  !screenshot-2.png! 
> {code:scala}
> import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
> WatermarkStrategy}
> import org.apache.flink.api.connector.source.Source
> import org.apache.flink.api.connector.source.lib.NumberSequenceSource
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
> import java.time.format.DateTimeFormatter
> import java.time.{Duration, Instant, ZoneId}
> import java.util
> object UnionWaterMarkTest {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new 
> Configuration)
>     env.setParallelism(2)
>     val numberSequenceSource: Source[Long,
>       NumberSequenceSource.NumberSequenceSplit,
>       util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new 
> NumberSequenceSource(0L, 100000000L)
>       .asInstanceOf[Source[Long,
>       NumberSequenceSource.NumberSequenceSplit,
>       util.Collection[NumberSequenceSource.NumberSequenceSplit]]]
>     val stream1 = env.fromSource(numberSequenceSource,
>       WatermarkStrategy
>         .forMonotonousTimestamps[Long]()
>         .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
>           override def extractTimestamp(element: Long, recordTimestamp: 
> Long): Long = {
>             Instant.now().toEpochMilli
>           }
>         }),
>       "source"
>     )
>     val idleMillis = 1L
>     val stream2 = env.fromSource(numberSequenceSource,
>       WatermarkStrategy
>         .forMonotonousTimestamps[Long]()
>         .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
>           override def extractTimestamp(element: Long, recordTimestamp: 
> Long): Long = {
>             Instant.now().toEpochMilli - (1000L * 60L * 60L)
>           }
>         })
>         .withIdleness(Duration.ofMillis(idleMillis))
>       ,
>       "source"
>     )
>     stream1
>       .process(new PrintWatermarkProcess("stream1"))
>       .returns(classOf[Long])
>       .startNewChain()
>       .union(
>         stream2
>           .process(new PrintWatermarkProcess("stream2"))
>           .returns(classOf[Long])
>           .startNewChain()
>           .process(new PrintWatermarkProcess("stream3"))
>           .returns(classOf[Long])
>           .startNewChain()
>       )
>       .process(new PrintWatermarkProcess("union"))
>       .returns(classOf[Long])
>       .filter(value => false)
>       .print()
>     env.execute()
>   }
> }
> class PrintWatermarkProcess(operatorName: String) extends 
> ProcessFunction[Long, Long] {
>   override def processElement(value: Long, ctx: ProcessFunction[Long, 
> Long]#Context, out: Collector[Long]): Unit = {
>     out.collect(value)
>     val watermark = ctx.timerService().currentWatermark()
>     if (watermark > 0 && watermark < 2222222222222L) {
>       Instant.ofEpochMilli(watermark)
>       val datetimeStr = 
> DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
> //      println(operatorName + "  " + datetimeStr)
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to