I have a pipeline which
1. Read from KafkaIO
2. Does stuff with events and writes windowed file via FileIO
3. Apply statefull DoFn on written files info
The statefull DoFn does some logic which depends on PaneInfo.Timing, if it
is EARLY or something else. When testing in DirectRunner, all is good. But
with FlinkRunner, panes are always NO_FIRING.
To demonstrate this, here is a dummy test pipeline:
val testStream = sc.testStream(testStreamOf[String]
.advanceWatermarkTo(new Instant(1))
.addElements(goodMessage, goodMessage)
.advanceWatermarkTo(new Instant(2))
.addElements(goodMessage, goodMessage)
.advanceWatermarkTo(new Instant(2000000))
.addElements(goodMessage, goodMessage)
.advanceWatermarkToInfinity())
testStream
.withFixedWindows(
duration = Duration.standardSeconds(1),
options = WindowOptions(
trigger = AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(1))
.withLateFirings(AfterPane.elementCountAtLeast(1)),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = Duration.standardDays(1)
))
.keyBy(_ => "static_key")
.withPaneInfo
.map { case (element, paneInfo) =>
println(s"#1 - $paneInfo")
element
}
//.groupByKey // <- need to uncomment this for Flink to work
.applyTransform(Reshuffle.viaRandomKey())
.withPaneInfo
.map { case (element, paneInfo) =>
println(s"#2 - $paneInfo")
element
}
When executed with DirectRunner, #1 prints pane with UNKNOWN timing and #2
with EARLY, which is what I expect. When run with Flink runner, both #1 and
#2 writes UNKNOWN timing from PaneInfo.NO_FIRING. Only if I add extra GBK,
then #2 writes panes with EARLY timing.
This is run on Beam 2.19. I was trying to analyze where could be a problem
but got lost. I will be happy for any suggestions or pointers. Does it
sounds like bug or am I doing something wrong?