I hope someone who knows better than me can respond. A long time ago, the SparkRunner added a call to materialize() at every GroupByKey. This was to mimic Dataflow, since so many of the initial IO transforms relied on using shuffle to create stable inputs.
The overall goal is to be able to remove these extra calls to materialize() and only include them when @RequiresStableInput. The intermediate state is to analyze whether input is already stable from materialize() and add another materialize() only if it is not stable. I don't know the current state of the SparkRunner. This may already have changed. Kenn On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > I was trying to look for references on how other runners handle > @RequiresStableInput for batch cases, however I was not able to find any. > In Flink I can see added support for streaming case and in Dataflow I see > that support for the feature was turned off > https://github.com/apache/beam/pull/8065 > > It seems to me that @RequiresStableInput is ignored for the batch case and > the runner relies on being able to recompute the whole job in the worst > case scenario. > Is this assumption correct? > Could I just change SparkRunner to crash on @RequiresStableInput > annotation for streaming mode and ignore it in batch? > > > > On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> We have a component which we use in streaming and batch jobs. >> Streaming we run on FlinkRunner and batch on SparkRunner. Recently we >> needed to add @RequiresStableInput to taht component because of streaming >> use-case. But now batch case crash on SparkRunner with >> >> Caused by: java.lang.UnsupportedOperationException: Spark runner currently >> doesn't support @RequiresStableInput annotation. >> at >> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58) >> at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556) >> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292) >> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210) >> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168) >> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) >> at >> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42) >> at >> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35) >> at scala.util.Try$.apply(Try.scala:192) >> at >> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35) >> >> >> We are using Beam 2.19.0. Is the @RequiresStableInput problematic to >> support for both streaming and batch use-case? What are the options here? >> https://issues.apache.org/jira/browse/BEAM-5358 >> >>