Hi Eron!

Yes, the idea is to actually switch all executions to a backtracking
scheduling mode. That simultaneously solves both fine grained recovery and
lazy execution, where later stages build on prior stages.

With all the work around streaming, we have not gotten to this so far, but
it is one feature still in the list...

Greetings,
Stephan


On Mon, May 30, 2016 at 9:55 PM, Eron Wright <ewri...@live.com> wrote:

> Thinking out loud now…
>
> Is the job graph fully mutable?   Can it be cleared?   For example,
> shouldn’t the count method remove the sink after execution completes?
>
> Can numerous job graphs co-exist within a single driver program?    How
> would that relate to the session concept?
>
> Seems the count method should use ‘backtracking’ schedule mode, and only
> execute the minimum needed to materialize the count sink.
>
> > On May 29, 2016, at 3:08 PM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
> >
> > Hey Eron,
> >
> > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > execution, thus they also trigger writing to any previously defined
> sinks.
> > The idea behind this behavior is to enable interactive querying (the one
> > that you are used to get from a shell environment) and it is also a great
> > debugging tool.
> >
> > Best,
> >
> > Marton
> >
> > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <ewri...@live.com> wrote:
> >
> >> I was curious as to how the `count` method on DataSet worked, and was
> >> surprised to see that it executes the entire program graph.   Wouldn’t
> this
> >> cause undesirable side-effects like writing to sinks?    Also strange
> that
> >> the graph is mutated with the addition of a sink (that isn’t
> subsequently
> >> removed).
> >>
> >> Surveying the Flink code, there aren’t many situations where the program
> >> graph is implicitly executed (`collect` is another).   Nonetheless, this
> >> has deepened my appreciation for how dynamic the application might be.
> >>
> >> // DataSet.java
> >> public long count() throws Exception {
> >>   final String id = new AbstractID().toString();
> >>
> >>   output(new Utils.CountHelper<T>(id)).name("count()");
> >>
> >>   JobExecutionResult res = getExecutionEnvironment().execute();
> >>   return res.<Long> getAccumulatorResult(id);
> >> }
> >> Eron
>
>

Reply via email to