Hi Jozef, I think this is expected beahior as Flink does not use default
expansion for Reshuffle (uses round-robin rebalance ship strategy instead).
There is no aggregation that needs buffering (and triggering). All of the
elements are immediately emmited to downstream operations after the
Reshuffle.

In case of direct runner, this is just a side-effect of Reshuffle
expansion. See
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L69
for more details.

I don't think we should expect Reshuffle to have the same semantics as GBK,
because it's only an performance optimization steps, that should not have
any effect to pipeline's overall result. Some runners may also completely
ignore this step as part of execution plan optimization process (eg. two
reshuffles in a row are idempotent). (
https://issues.apache.org/jira/browse/BEAM-9824)

D.

On Mon, May 4, 2020 at 2:48 PM Jozef Vilcek <[email protected]> wrote:

> I have a pipeline which
>
> 1. Read from KafkaIO
> 2. Does stuff with events and writes windowed file via FileIO
> 3. Apply statefull DoFn on written files info
>
> The statefull DoFn does some logic which depends on PaneInfo.Timing, if it
> is EARLY or something else. When testing in DirectRunner, all is good. But
> with FlinkRunner, panes are always NO_FIRING.
>
> To demonstrate this, here is a dummy test pipeline:
>
>     val testStream = sc.testStream(testStreamOf[String]
>       .advanceWatermarkTo(new Instant(1))
>       .addElements(goodMessage, goodMessage)
>       .advanceWatermarkTo(new Instant(2))
>       .addElements(goodMessage, goodMessage)
>       .advanceWatermarkTo(new Instant(2000000))
>       .addElements(goodMessage, goodMessage)
>       .advanceWatermarkToInfinity())
>
>     testStream
>       .withFixedWindows(
>         duration = Duration.standardSeconds(1),
>         options = WindowOptions(
>           trigger = AfterWatermark.pastEndOfWindow()
>             .withEarlyFirings(AfterPane.elementCountAtLeast(1))
>             .withLateFirings(AfterPane.elementCountAtLeast(1)),
>           accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
>           allowedLateness = Duration.standardDays(1)
>         ))
>       .keyBy(_ => "static_key")
>       .withPaneInfo
>       .map { case (element, paneInfo) =>
>         println(s"#1 - $paneInfo")
>         element
>       }
>       //.groupByKey // <- need to uncomment this for Flink to work
>       .applyTransform(Reshuffle.viaRandomKey())
>       .withPaneInfo
>       .map { case (element, paneInfo) =>
>         println(s"#2 - $paneInfo")
>         element
>       }
>
> When executed with DirectRunner, #1 prints pane with UNKNOWN timing and #2
> with EARLY, which is what I expect. When run with Flink runner, both #1 and
> #2 writes UNKNOWN timing from PaneInfo.NO_FIRING. Only if I add extra GBK,
> then #2 writes panes with EARLY timing.
>
> This is run on Beam 2.19. I was trying to analyze where could be a problem
> but got lost. I will be happy for any suggestions or pointers. Does it
> sounds like bug or am I doing something wrong?
>

Reply via email to