My last question was more towards the graph translation for batch mode. Should DoFn with @RequiresStableInput be translated/expanded in some specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch? Most runners fail in the presence of @RequiresStableInput for both batch and streaming. I can not find a fail for Flink and Dataflow, but at the same time, I can not find what those runners do with such DoFn.
On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <k...@apache.org> wrote: > I hope someone who knows better than me can respond. > > A long time ago, the SparkRunner added a call to materialize() at every > GroupByKey. This was to mimic Dataflow, since so many of the initial IO > transforms relied on using shuffle to create stable inputs. > > The overall goal is to be able to remove these extra calls to > materialize() and only include them when @RequiresStableInput. > > The intermediate state is to analyze whether input is already stable from > materialize() and add another materialize() only if it is not stable. > > I don't know the current state of the SparkRunner. This may already have > changed. > > Kenn > > On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> I was trying to look for references on how other runners handle >> @RequiresStableInput for batch cases, however I was not able to find any. >> In Flink I can see added support for streaming case and in Dataflow I see >> that support for the feature was turned off >> https://github.com/apache/beam/pull/8065 >> >> It seems to me that @RequiresStableInput is ignored for the batch case >> and the runner relies on being able to recompute the whole job in the worst >> case scenario. >> Is this assumption correct? >> Could I just change SparkRunner to crash on @RequiresStableInput >> annotation for streaming mode and ignore it in batch? >> >> >> >> On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> We have a component which we use in streaming and batch jobs. >>> Streaming we run on FlinkRunner and batch on SparkRunner. Recently we >>> needed to add @RequiresStableInput to taht component because of streaming >>> use-case. But now batch case crash on SparkRunner with >>> >>> Caused by: java.lang.UnsupportedOperationException: Spark runner currently >>> doesn't support @RequiresStableInput annotation. >>> at >>> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58) >>> at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556) >>> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292) >>> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210) >>> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168) >>> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90) >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) >>> at >>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42) >>> at >>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35) >>> at scala.util.Try$.apply(Try.scala:192) >>> at >>> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35) >>> >>> >>> We are using Beam 2.19.0. Is the @RequiresStableInput problematic to >>> support for both streaming and batch use-case? What are the options here? >>> https://issues.apache.org/jira/browse/BEAM-5358 >>> >>>