Hi Simone,

This can be done with a map followed by a reduce. DataSet#count leverages
accumulators which perform an inherent reduce. Also, DataSet#count
implements RichOutputFormat as an optimization to only require a single
operator. Previously the counting and accumulating was handled in a
RichMapFunction followed by a DiscardingOutputFormat.

This has come up in Gelly as graphs have global metrics such as the Global
Clustering Coefficient for which we need to count a DataSet of triangles as
well as the number of triplets. It would be great if there was a way to
optimally both return a DataSet and compute an accumulated value.

Greg

On Mon, May 30, 2016 at 6:27 PM, Simone Robutti <
simone.robu...@radicalbit.io> wrote:

> On this same subject, I have a question. Is it possible to achieve a lazy
> count that transforms a DataSet[T] to a DataSet[Long] with a single value
> containing the length of the original DataSet? Otherwise what is the best
> way to count the elements lazily?
>
> 2016-05-30 23:49 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
> > Hi Eron!
> >
> > Yes, the idea is to actually switch all executions to a backtracking
> > scheduling mode. That simultaneously solves both fine grained recovery
> and
> > lazy execution, where later stages build on prior stages.
> >
> > With all the work around streaming, we have not gotten to this so far,
> but
> > it is one feature still in the list...
> >
> > Greetings,
> > Stephan
> >
> >
> > On Mon, May 30, 2016 at 9:55 PM, Eron Wright <ewri...@live.com> wrote:
> >
> > > Thinking out loud now…
> > >
> > > Is the job graph fully mutable?   Can it be cleared?   For example,
> > > shouldn’t the count method remove the sink after execution completes?
> > >
> > > Can numerous job graphs co-exist within a single driver program?    How
> > > would that relate to the session concept?
> > >
> > > Seems the count method should use ‘backtracking’ schedule mode, and
> only
> > > execute the minimum needed to materialize the count sink.
> > >
> > > > On May 29, 2016, at 3:08 PM, Márton Balassi <
> balassi.mar...@gmail.com>
> > > wrote:
> > > >
> > > > Hey Eron,
> > > >
> > > > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > > > execution, thus they also trigger writing to any previously defined
> > > sinks.
> > > > The idea behind this behavior is to enable interactive querying (the
> > one
> > > > that you are used to get from a shell environment) and it is also a
> > great
> > > > debugging tool.
> > > >
> > > > Best,
> > > >
> > > > Marton
> > > >
> > > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <ewri...@live.com>
> > wrote:
> > > >
> > > >> I was curious as to how the `count` method on DataSet worked, and
> was
> > > >> surprised to see that it executes the entire program graph.
>  Wouldn’t
> > > this
> > > >> cause undesirable side-effects like writing to sinks?    Also
> strange
> > > that
> > > >> the graph is mutated with the addition of a sink (that isn’t
> > > subsequently
> > > >> removed).
> > > >>
> > > >> Surveying the Flink code, there aren’t many situations where the
> > program
> > > >> graph is implicitly executed (`collect` is another).   Nonetheless,
> > this
> > > >> has deepened my appreciation for how dynamic the application might
> be.
> > > >>
> > > >> // DataSet.java
> > > >> public long count() throws Exception {
> > > >>   final String id = new AbstractID().toString();
> > > >>
> > > >>   output(new Utils.CountHelper<T>(id)).name("count()");
> > > >>
> > > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > > >>   return res.<Long> getAccumulatorResult(id);
> > > >> }
> > > >> Eron
> > >
> > >
> >
>

Reply via email to