Then maybe the better question is, what is the behaviour / guarantee of propagating PaneInfo between steps of the pipeline.
If I do write files like this: PCollection<KV<String, String>> destFileNames = windowedData.apply(FileIO.write() ...).getPerDestinationOutputFilenames Then even if data written to files are windowed and materialised files are associated to certain triggered panes, the `destFileNames` pcollection does not necessarily carry such information. It is runner depended behaviour. In older versions of Beam pane info was propagated. The reason is that internally, WriteFiles does use Reshuffle (and many other parts of Beam does too). Now is this expected with respect to model and API? How does actually paneInfo "get lost" in case of doing flink rebalance? On Tue, May 5, 2020 at 7:39 AM David Morávek <[email protected]> wrote: > Hi Jozef, I think this is expected beahior as Flink does not use default > expansion for Reshuffle (uses round-robin rebalance ship strategy instead). > There is no aggregation that needs buffering (and triggering). All of the > elements are immediately emmited to downstream operations after the > Reshuffle. > > In case of direct runner, this is just a side-effect of Reshuffle > expansion. See > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L69 > for more details. > > I don't think we should expect Reshuffle to have the same semantics as > GBK, because it's only an performance optimization steps, that should not > have any effect to pipeline's overall result. Some runners may also > completely ignore this step as part of execution plan optimization process > (eg. two reshuffles in a row are idempotent). ( > https://issues.apache.org/jira/browse/BEAM-9824) > > D. > > On Mon, May 4, 2020 at 2:48 PM Jozef Vilcek <[email protected]> wrote: > >> I have a pipeline which >> >> 1. Read from KafkaIO >> 2. Does stuff with events and writes windowed file via FileIO >> 3. Apply statefull DoFn on written files info >> >> The statefull DoFn does some logic which depends on PaneInfo.Timing, if >> it is EARLY or something else. When testing in DirectRunner, all is good. >> But with FlinkRunner, panes are always NO_FIRING. >> >> To demonstrate this, here is a dummy test pipeline: >> >> val testStream = sc.testStream(testStreamOf[String] >> .advanceWatermarkTo(new Instant(1)) >> .addElements(goodMessage, goodMessage) >> .advanceWatermarkTo(new Instant(2)) >> .addElements(goodMessage, goodMessage) >> .advanceWatermarkTo(new Instant(2000000)) >> .addElements(goodMessage, goodMessage) >> .advanceWatermarkToInfinity()) >> >> testStream >> .withFixedWindows( >> duration = Duration.standardSeconds(1), >> options = WindowOptions( >> trigger = AfterWatermark.pastEndOfWindow() >> .withEarlyFirings(AfterPane.elementCountAtLeast(1)) >> .withLateFirings(AfterPane.elementCountAtLeast(1)), >> accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES, >> allowedLateness = Duration.standardDays(1) >> )) >> .keyBy(_ => "static_key") >> .withPaneInfo >> .map { case (element, paneInfo) => >> println(s"#1 - $paneInfo") >> element >> } >> //.groupByKey // <- need to uncomment this for Flink to work >> .applyTransform(Reshuffle.viaRandomKey()) >> .withPaneInfo >> .map { case (element, paneInfo) => >> println(s"#2 - $paneInfo") >> element >> } >> >> When executed with DirectRunner, #1 prints pane with UNKNOWN timing and >> #2 with EARLY, which is what I expect. When run with Flink runner, both #1 >> and #2 writes UNKNOWN timing from PaneInfo.NO_FIRING. Only if I add extra >> GBK, then #2 writes panes with EARLY timing. >> >> This is run on Beam 2.19. I was trying to analyze where could be a >> problem but got lost. I will be happy for any suggestions or pointers. Does >> it sounds like bug or am I doing something wrong? >> >
