[
https://issues.apache.org/jira/browse/FLINK-29380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xiechenling updated FLINK-29380:
--------------------------------
Description:
Two streams union, watermark error, not the minimum value, connect operator
watermark is true.
!image-2022-09-21-17-59-01-846.png!
This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is
normal whether idle watermark is set or not. In flink 1.15.2, watermark is
normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.
!screenshot-1.png!
!screenshot-2.png!
!screenshot-3.png!
What I don't understand is, if the maximum watermark is issued by idle, the
watermark of the union operator is incorrect, why the watermark of the operator
before the union operator is normal?
{code:scala}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
import org.apache.flink.api.connector.source.Source
import org.apache.flink.api.connector.source.lib.NumberSequenceSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import java.time.format.DateTimeFormatter
import java.time.{Duration, Instant, ZoneId}
import java.util
object UnionWaterMarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration)
env.setParallelism(2)
val numberSequenceSource: Source[Long,
NumberSequenceSource.NumberSequenceSplit,
util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new
NumberSequenceSource(0L, 100000000L)
.asInstanceOf[Source[Long,
NumberSequenceSource.NumberSequenceSplit,
util.Collection[NumberSequenceSource.NumberSequenceSplit]]]
val stream1 = env.fromSource(numberSequenceSource,
WatermarkStrategy
.forMonotonousTimestamps[Long]()
.withTimestampAssigner(new SerializableTimestampAssigner[Long] {
override def extractTimestamp(element: Long, recordTimestamp: Long):
Long = {
Instant.now().toEpochMilli
}
}),
"source"
)
val idleMillis = 1L
val stream2 = env.fromSource(numberSequenceSource,
WatermarkStrategy
.forMonotonousTimestamps[Long]()
.withTimestampAssigner(new SerializableTimestampAssigner[Long] {
override def extractTimestamp(element: Long, recordTimestamp: Long):
Long = {
Instant.now().toEpochMilli - (1000L * 60L * 60L)
}
})
.withIdleness(Duration.ofMillis(idleMillis))
,
"source"
)
stream1
.process(new PrintWatermarkProcess("stream1"))
.returns(classOf[Long])
.startNewChain()
.union(
stream2
.process(new PrintWatermarkProcess("stream2"))
.returns(classOf[Long])
.startNewChain()
.process(new PrintWatermarkProcess("stream3"))
.returns(classOf[Long])
.startNewChain()
)
.process(new PrintWatermarkProcess("union"))
.returns(classOf[Long])
.filter(value => false)
.print()
env.execute()
}
}
class PrintWatermarkProcess(operatorName: String) extends ProcessFunction[Long,
Long] {
override def processElement(value: Long, ctx: ProcessFunction[Long,
Long]#Context, out: Collector[Long]): Unit = {
out.collect(value)
val watermark = ctx.timerService().currentWatermark()
if (watermark > 0 && watermark < 2222222222222L) {
Instant.ofEpochMilli(watermark)
val datetimeStr =
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
// println(operatorName + " " + datetimeStr)
}
}
}
{code}
was:
Two streams union, watermark error, not the minimum value, connect operator
watermark is true.
!image-2022-09-21-17-59-01-846.png!
This phenomenon feels related to watermark idle. In flink 1.13.1, watermark is
normal whether idle watermark is set or not. In flink 1.15.2, watermark is
normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.
!screenshot-1.png!
!screenshot-2.png!
!screenshot-3.png!
{code:scala}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
import org.apache.flink.api.connector.source.Source
import org.apache.flink.api.connector.source.lib.NumberSequenceSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import java.time.format.DateTimeFormatter
import java.time.{Duration, Instant, ZoneId}
import java.util
object UnionWaterMarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration)
env.setParallelism(2)
val numberSequenceSource: Source[Long,
NumberSequenceSource.NumberSequenceSplit,
util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new
NumberSequenceSource(0L, 100000000L)
.asInstanceOf[Source[Long,
NumberSequenceSource.NumberSequenceSplit,
util.Collection[NumberSequenceSource.NumberSequenceSplit]]]
val stream1 = env.fromSource(numberSequenceSource,
WatermarkStrategy
.forMonotonousTimestamps[Long]()
.withTimestampAssigner(new SerializableTimestampAssigner[Long] {
override def extractTimestamp(element: Long, recordTimestamp: Long):
Long = {
Instant.now().toEpochMilli
}
}),
"source"
)
val idleMillis = 1L
val stream2 = env.fromSource(numberSequenceSource,
WatermarkStrategy
.forMonotonousTimestamps[Long]()
.withTimestampAssigner(new SerializableTimestampAssigner[Long] {
override def extractTimestamp(element: Long, recordTimestamp: Long):
Long = {
Instant.now().toEpochMilli - (1000L * 60L * 60L)
}
})
.withIdleness(Duration.ofMillis(idleMillis))
,
"source"
)
stream1
.process(new PrintWatermarkProcess("stream1"))
.returns(classOf[Long])
.startNewChain()
.union(
stream2
.process(new PrintWatermarkProcess("stream2"))
.returns(classOf[Long])
.startNewChain()
.process(new PrintWatermarkProcess("stream3"))
.returns(classOf[Long])
.startNewChain()
)
.process(new PrintWatermarkProcess("union"))
.returns(classOf[Long])
.filter(value => false)
.print()
env.execute()
}
}
class PrintWatermarkProcess(operatorName: String) extends ProcessFunction[Long,
Long] {
override def processElement(value: Long, ctx: ProcessFunction[Long,
Long]#Context, out: Collector[Long]): Unit = {
out.collect(value)
val watermark = ctx.timerService().currentWatermark()
if (watermark > 0 && watermark < 2222222222222L) {
Instant.ofEpochMilli(watermark)
val datetimeStr =
DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
// println(operatorName + " " + datetimeStr)
}
}
}
{code}
> Two streams union, watermark error, not the minimum value
> ---------------------------------------------------------
>
> Key: FLINK-29380
> URL: https://issues.apache.org/jira/browse/FLINK-29380
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.16.0, 1.15.2
> Reporter: xiechenling
> Priority: Blocker
> Fix For: 1.16.0, 1.15.3
>
> Attachments: image-2022-09-21-17-59-01-846.png, screenshot-1.png,
> screenshot-2.png, screenshot-3.png
>
>
> Two streams union, watermark error, not the minimum value, connect operator
> watermark is true.
> !image-2022-09-21-17-59-01-846.png!
> This phenomenon feels related to watermark idle. In flink 1.13.1, watermark
> is normal whether idle watermark is set or not. In flink 1.15.2, watermark is
> normal when not set idle or idle set 1000ms, but idle set 1ms watermark wrong.
> !screenshot-1.png!
> !screenshot-2.png!
> !screenshot-3.png!
> What I don't understand is, if the maximum watermark is issued by idle, the
> watermark of the union operator is incorrect, why the watermark of the
> operator before the union operator is normal?
> {code:scala}
> import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
> WatermarkStrategy}
> import org.apache.flink.api.connector.source.Source
> import org.apache.flink.api.connector.source.lib.NumberSequenceSource
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
> import java.time.format.DateTimeFormatter
> import java.time.{Duration, Instant, ZoneId}
> import java.util
> object UnionWaterMarkTest {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration)
> env.setParallelism(2)
> val numberSequenceSource: Source[Long,
> NumberSequenceSource.NumberSequenceSplit,
> util.Collection[NumberSequenceSource.NumberSequenceSplit]] = new
> NumberSequenceSource(0L, 100000000L)
> .asInstanceOf[Source[Long,
> NumberSequenceSource.NumberSequenceSplit,
> util.Collection[NumberSequenceSource.NumberSequenceSplit]]]
> val stream1 = env.fromSource(numberSequenceSource,
> WatermarkStrategy
> .forMonotonousTimestamps[Long]()
> .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
> override def extractTimestamp(element: Long, recordTimestamp:
> Long): Long = {
> Instant.now().toEpochMilli
> }
> }),
> "source"
> )
> val idleMillis = 1L
> val stream2 = env.fromSource(numberSequenceSource,
> WatermarkStrategy
> .forMonotonousTimestamps[Long]()
> .withTimestampAssigner(new SerializableTimestampAssigner[Long] {
> override def extractTimestamp(element: Long, recordTimestamp:
> Long): Long = {
> Instant.now().toEpochMilli - (1000L * 60L * 60L)
> }
> })
> .withIdleness(Duration.ofMillis(idleMillis))
> ,
> "source"
> )
> stream1
> .process(new PrintWatermarkProcess("stream1"))
> .returns(classOf[Long])
> .startNewChain()
> .union(
> stream2
> .process(new PrintWatermarkProcess("stream2"))
> .returns(classOf[Long])
> .startNewChain()
> .process(new PrintWatermarkProcess("stream3"))
> .returns(classOf[Long])
> .startNewChain()
> )
> .process(new PrintWatermarkProcess("union"))
> .returns(classOf[Long])
> .filter(value => false)
> .print()
> env.execute()
> }
> }
> class PrintWatermarkProcess(operatorName: String) extends
> ProcessFunction[Long, Long] {
> override def processElement(value: Long, ctx: ProcessFunction[Long,
> Long]#Context, out: Collector[Long]): Unit = {
> out.collect(value)
> val watermark = ctx.timerService().currentWatermark()
> if (watermark > 0 && watermark < 2222222222222L) {
> Instant.ofEpochMilli(watermark)
> val datetimeStr =
> DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()).format(Instant.ofEpochMilli(watermark))
> // println(operatorName + " " + datetimeStr)
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)