Would it then be safe to enable the same behavior for Spark batch? I can
create a JIRA and patch for this, if there is no other reason to not to
do so

On Wed, Jul 8, 2020 at 11:51 AM Maximilian Michels <m...@apache.org> wrote:

> Correct, for batch we rely on re-running the entire job which will
> produce stable input within each run.
>
> For streaming, the Flink Runner buffers all input to a
> @RequiresStableInput DoFn until a checkpoint is complete, only then it
> processes the buffered data. Dataflow effectively does the same by going
> through the Shuffle service which produces a consistent result.
>
> -Max
>
> On 08.07.20 11:08, Jozef Vilcek wrote:
> > My last question was more towards the graph translation for batch mode.
> >
> > Should DoFn with @RequiresStableInput be translated/expanded in some
> > specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for
> batch?
> > Most runners fail in the presence of @RequiresStableInput for both batch
> > and streaming. I can not find a fail for Flink and Dataflow, but at the
> > same time, I can not find what those runners do with such DoFn.
> >
> > On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <k...@apache.org
> > <mailto:k...@apache.org>> wrote:
> >
> >     I hope someone who knows better than me can respond.
> >
> >     A long time ago, the SparkRunner added a call to materialize() at
> >     every GroupByKey. This was to mimic Dataflow, since so many of the
> >     initial IO transforms relied on using shuffle to create stable
> inputs.
> >
> >     The overall goal is to be able to remove these extra calls to
> >     materialize() and only include them when @RequiresStableInput.
> >
> >     The intermediate state is to analyze whether input is already stable
> >     from materialize() and add another materialize() only if it is not
> >     stable.
> >
> >     I don't know the current state of the SparkRunner. This may already
> >     have changed.
> >
> >     Kenn
> >
> >     On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com
> >     <mailto:jozo.vil...@gmail.com>> wrote:
> >
> >         I was trying to look for references on how other runners handle
> >         @RequiresStableInput for batch cases, however I was not able to
> >         find any.
> >         In Flink I can see added support for streaming case and in
> >         Dataflow I see that support for the feature was turned off
> >         https://github.com/apache/beam/pull/8065
> >
> >         It seems to me that @RequiresStableInput is ignored for the
> >         batch case and the runner relies on being able to recompute the
> >         whole job in the worst case scenario.
> >         Is this assumption correct?
> >         Could I just change SparkRunner to crash on @RequiresStableInput
> >         annotation for streaming mode and ignore it in batch?
> >
> >
> >
> >         On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek
> >         <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
> >
> >             We have a component which we use in streaming and batch
> >             jobs. Streaming we run on FlinkRunner and batch on
> >             SparkRunner. Recently we needed to add @RequiresStableInput
> >             to taht component because of streaming use-case. But now
> >             batch case crash on SparkRunner with
> >
> >             Caused by: java.lang.UnsupportedOperationException: Spark
> runner currently doesn't support @RequiresStableInput annotation.
> >               at
> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
> >               at
> org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
> >               at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
> >               at
> org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
> >               at
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
> >               at
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
> >               at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> >               at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> >               at
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
> >               at
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
> >               at scala.util.Try$.apply(Try.scala:192)
> >               at
> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
> >
> >
> >             We are using Beam 2.19.0. Is the @RequiresStableInput
> >             problematic to support for both streaming and batch
> >             use-case? What are the options here?
> >             https://issues.apache.org/jira/browse/BEAM-5358
> >
>

Reply via email to