Would it then be safe to enable the same behavior for Spark batch? I can create a JIRA and patch for this, if there is no other reason to not to do so
On Wed, Jul 8, 2020 at 11:51 AM Maximilian Michels <m...@apache.org> wrote: > Correct, for batch we rely on re-running the entire job which will > produce stable input within each run. > > For streaming, the Flink Runner buffers all input to a > @RequiresStableInput DoFn until a checkpoint is complete, only then it > processes the buffered data. Dataflow effectively does the same by going > through the Shuffle service which produces a consistent result. > > -Max > > On 08.07.20 11:08, Jozef Vilcek wrote: > > My last question was more towards the graph translation for batch mode. > > > > Should DoFn with @RequiresStableInput be translated/expanded in some > > specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for > batch? > > Most runners fail in the presence of @RequiresStableInput for both batch > > and streaming. I can not find a fail for Flink and Dataflow, but at the > > same time, I can not find what those runners do with such DoFn. > > > > On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <k...@apache.org > > <mailto:k...@apache.org>> wrote: > > > > I hope someone who knows better than me can respond. > > > > A long time ago, the SparkRunner added a call to materialize() at > > every GroupByKey. This was to mimic Dataflow, since so many of the > > initial IO transforms relied on using shuffle to create stable > inputs. > > > > The overall goal is to be able to remove these extra calls to > > materialize() and only include them when @RequiresStableInput. > > > > The intermediate state is to analyze whether input is already stable > > from materialize() and add another materialize() only if it is not > > stable. > > > > I don't know the current state of the SparkRunner. This may already > > have changed. > > > > Kenn > > > > On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com > > <mailto:jozo.vil...@gmail.com>> wrote: > > > > I was trying to look for references on how other runners handle > > @RequiresStableInput for batch cases, however I was not able to > > find any. > > In Flink I can see added support for streaming case and in > > Dataflow I see that support for the feature was turned off > > https://github.com/apache/beam/pull/8065 > > > > It seems to me that @RequiresStableInput is ignored for the > > batch case and the runner relies on being able to recompute the > > whole job in the worst case scenario. > > Is this assumption correct? > > Could I just change SparkRunner to crash on @RequiresStableInput > > annotation for streaming mode and ignore it in batch? > > > > > > > > On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek > > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: > > > > We have a component which we use in streaming and batch > > jobs. Streaming we run on FlinkRunner and batch on > > SparkRunner. Recently we needed to add @RequiresStableInput > > to taht component because of streaming use-case. But now > > batch case crash on SparkRunner with > > > > Caused by: java.lang.UnsupportedOperationException: Spark > runner currently doesn't support @RequiresStableInput annotation. > > at > org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58) > > at > org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556) > > at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292) > > at > org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210) > > at > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168) > > at > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) > > at > com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42) > > at > com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35) > > at scala.util.Try$.apply(Try.scala:192) > > at > com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35) > > > > > > We are using Beam 2.19.0. Is the @RequiresStableInput > > problematic to support for both streaming and batch > > use-case? What are the options here? > > https://issues.apache.org/jira/browse/BEAM-5358 > > >