Hi,

I've created issues for (1)
<https://issues.apache.org/jira/browse/FLINK-1091> and (2)
<https://issues.apache.org/jira/browse/FLINK-1092>.
Regarding (3), I'm not quite sure how it should be done or whether it
should be part of a more general iteration type, so I'd like to think about
it a bit more. Any thoughts and comments will be much appreciated :)

Cheers,
V.



On 4 September 2014 21:07, Fabian Hueske <fhue...@apache.org> wrote:

> Hi Vasia,
>
> thanks for the nice wrap-up :-)
> All of these would be very valuable enhancements of the system, IMO.
>
> I'd suggest you create JIRAs for the individual points.
> It would be very cool, if you'd pick up some of the tasks. :-)
>
> Cheers, Fabian
>
>
> 2014-09-04 20:43 GMT+02:00 Vasiliki Kalavri <vasilikikala...@gmail.com>:
>
> > Hello all,
> >
> > thank you Stephan and Aljoscha for the great input!
> >
> > Regarding my initial issue in this e-mail, I have solved it a while ago,
> by
> > (1) flattening the input before the join and (2) realizing that there was
> > an easier way to implement my algorithm than what I was trying to do o.O
> >
> > However, several interesting discussion points came out of this, so let
> me
> > try to summarize:
> >
> > (1) Support joining the solution set, using key selectors.
> > This is the initial problem that started this discussion. As Stephan
> > explains above, a possible solution can be providing explicit functions
> > "joinWithSolution" and "coGroupWithSolution" to make sure the keys used
> are
> > valid.
> >
> > (2) Allow operations, other than join and cogroup, on the solution set.
> > For this one, I would like to first understand whether restricting the
> > operations to join and cogroup was a requirement for some reason (like
> > disallowing writing invalid constructs) or whether you did not see the
> use
> > case for it. The solution here can be using special functions or
> > constructs, like Aljoscha suggests.
> >
> > (3) Support types of iterations where the result does not have to be in a
> > "solution set" form. An example is the one Aljoscha gives with outputing
> > one element per iteration and concatenating them. I like this idea, but I
> > think this requires a bit more thought and should be something more
> > general.
> >
> > Do you have any other related issues to add?
> >
> > Personally, I'm interested in all of the above and I believe I can
> dedicate
> > some time to work on them :)
> > So, what do you think?
> >
> > Cheers,
> > V.
> >
> >
> > On 31 August 2014 08:54, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > > HI Guys,
> > > sorry for being so late in this discussion but I was thinking about
> > > how we could make iterations more intuitive. My Idea would be to make
> > > access to the solution set explicit in Delta Iterations. It should be
> > > usable as input for any operation and updates to it would be done
> > > through a special construct, not by joining with it. Maybe we could
> > > have a special UpdateSolution operator which would make sure that the
> > > data is partitioned correctly for the distributed hash table in which
> > > the solution is stored. Or we could have a function on the Iteration
> > > Context that can update the solution from an arbitraty operator. In my
> > > idea, Bulk Iteration would be the base operation and the other
> > > Iterations would add special features to it.  I also would like to add
> > > an iteration type where you can output an element in every iteration
> > > and the result is the concatenation of those elements. This seems to
> > > be required for some machine-learning-style algorithms.
> > >
> > > What do you guys think about this? I'm not sure on the details here
> > > since I'm busy with other low-level stuff (Generalizing Pair
> > > Comparators and the Scala API) but I was planning to tackle this
> > > afterwards. If someone else wanted to look into this I'd be happy to
> > > help and discuss, though. :D
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Sat, Aug 30, 2014 at 6:27 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > > > Addendum: The issue to enforce partitioning of the data set is
> tracked
> > > > here: https://issues.apache.org/jira/browse/FLINK-1060
> > > >
> > > >
> > > > On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >
> > > >> Hey!
> > > >>
> > > >> Sorry for the late reply, but here are some thoughts in that
> > direction:
> > > >>
> > > >>
> > > >> Stateful operations:
> > > >>
> > > >> In order to keep state (in the form of a hash map or so) around in
> > > >> iterations, you need not use the restricted means that the delta
> > > iterations
> > > >> give you. All function instances stay around for all supersteps, so
> > they
> > > >> can actually just create a map inside that function and update it as
> > you
> > > >> like.
> > > >>
> > > >> There is some example code at the end of the mail (listing 1) or
> > nicely
> > > >> formatted) in this gist:
> > > >> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09
> > > >>
> > > >>
> > > >> State across functions:
> > > >>
> > > >> If you actually want to have state that you access from multiple
> > > >> functions, that is also possible. Use a static broker to fetch the
> > map,
> > > >> like in listing 2 or this gist:
> > > >> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328
> > > >>
> > > >>
> > > >> Making sure data is partitioned:
> > > >>
> > > >> The more tricky part is making sure that the data is partitioned
> when
> > it
> > > >> enters the functions. Right now, this needs a hack: A GroupReduce
> > > function
> > > >> that simply emits everything. This will cause an unnecessary and
> > costly
> > > >> sort, so we need to get that out of the way. Until then, it should
> > allow
> > > >> you to write a prototype.
> > > >>
> > > >> The good news is that we are planning to add hints to tell the
> system
> > to
> > > >> partition/rebalance/etc data sets for operations. This would solve
> the
> > > >> issue nicely. It is not a terribly large change, so it should not
> take
> > > too
> > > >> long. I'll keep you posted on the progress...
> > > >>
> > > >> Greetings,
> > > >> Stephan
> > > >>
> > > >>
> > > >> ---------------------------------------------------------
> > > >> Sample 1: Flexible State in a Mapper
> > > >> ---------------------------------------------------------
> > > >>
> > > >> public static class MyMapper extends RichMapPartitionFunction<Edge,
> > > Edge> {
> > > >>
> > > >>       private final Map<Long, Edge> state = new HashMap<Long,
> Edge>();
> > > >>
> > > >>       @Override
> > > >>       public void open(Configuration conf) throws Exception {
> > > >>                               // load the state
> > > >>
> > > >>               if (getIterationRuntimeContext().getSuperstepNumber()
> ==
> > > 1) {
> > > >>                                               String pathToFragment
> =
> > > "hdfs://... or file://...";
> > > >>                       CsvInputFormat<Edge> reader = new
> > > CsvInputFormat<>(new Path(pathToFragment));
> > > >>                       reader.configure(new Configuration());
> > > >>                       reader.open(new FileInputSplit(0, new
> > > Path(pathToFragment), 0, 36584, null));
> > > >>                                               while
> > > (!reader.reachedEnd()) {
> > > >>                               Edge next = reader.nextRecord(new
> > Edge());
> > > >>                               state.put(next.f0, next);
> > > >>                       }
> > > >>               }
> > > >>
> > > >>       }
> > > >>
> > > >>       @Override
> > > >>
> > > >>       public void close() {
> > > >>               // check whether to write the state out
> > > >>               if (getIterationRuntimeContext().getSuperstepNumber()
> ==
> > > 42) {
> > > >>                       // write the state (similar code as in open()
> > for
> > > the reader)
> > > >>
> > > >>                                       }
> > > >>       }
> > > >>
> > > >>
> > > >>       @Override
> > > >>       public void mapPartition(Iterable<Edge> records,
> Collector<Edge>
> > > out) throws Exception {
> > > >>               // do something with the state
> > > >>               for (Edge e : records) {
> > > >>                       Edge other = state.get(e.f0);
> > > >>                       // do something
> > > >>
> > > >>                       state.put(...);
> > > >>
> > > >>               }
> > > >>       }
> > > >> }
> > > >>
> > > >>
> > > >> =======================================================
> > > >>
> > > >> ---------------------------------------------------------
> > > >> Sample 2: Sharing state across functions
> > > >> ---------------------------------------------------------
> > > >>
> > > >> public class StateBroker {
> > > >>
> > > >>   public static final ConcurrentHashMap<Integer, Map<Long, Edge>>
> > > BROKER = new ConcurrentHashMap<>();
> > > >>               public static Map<Long, Edge> getForSubtask(int
> > subtask) {
> > > >>                               Map<Long, Edge> entry =
> > > BROKER.get(subtask);
> > > >>               if (entry == null) {
> > > >>                       entry = new HashMap<>();
> > > >>                       Map<Long, Edge> previous =
> > > BROKER.putIfAbsent(subtask, entry);
> > > >>                       entry = previous == null ? entry : previous;
> > > >>               }
> > > >>                               return entry;
> > > >>       }
> > > >> }
> > > >>
> > > >>
> > > >> public static class MyMapper extends RichMapPartitionFunction<Edge,
> > > Edge> {
> > > >>
> > > >>       private Map<Long, Edge> state;
> > > >>
> > > >>       @Override
> > > >>       public void open(Configuration conf) throws Exception {
> > > >>                               // load the state
> > > >>
> > > >>               if (getIterationRuntimeContext().getSuperstepNumber()
> ==
> > > 1) {
> > > >>                       state =
> > > StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask());
> > > >>               }
> > > >>       }
> > > >>
> > > >>               ...
> > > >> }
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri <
> > > >> vasilikikala...@gmail.com> wrote:
> > > >>
> > > >>> Hey,
> > > >>>
> > > >>> thanks for replying so fast :)
> > > >>>
> > > >>> I saw the discussion in a previous thread concerning changing the
> API
> > > to
> > > >>> offer more explicit join functions.
> > > >>> I think providing these special functions is a good way to disable
> > any
> > > >>> other kind of interaction with the solution set.
> > > >>>
> > > >>> However, as a user, I would like to have a more flexible way of
> > > >>> interacting
> > > >>> with the state of the iteration.
> > > >>> In the program I'm describing above, I actually want to join on the
> > > value
> > > >>> of the solution set, not the key.
> > > >>> It would be nice to somehow have access to the solution set as any
> > > other
> > > >>> normal DataSet.
> > > >>>
> > > >>> I'm not sure how this could be supported, but if you think this is
> a
> > > good
> > > >>> idea, I could work on this!
> > > >>>
> > > >>> Cheers,
> > > >>> V.
> > > >>>
> > > >>>
> > > >>>
> > > >>> On 31 July 2014 15:50, Stephan Ewen <se...@apache.org> wrote:
> > > >>>
> > > >>> > Hi Vasia!
> > > >>> >
> > > >>> > There is no fundamental reason, we simply have not gotten around
> to
> > > >>> > implementing it, yet. Any help along these lines is highly
> welcome.
> > > >>> >
> > > >>> > One reason that held us back is that we need to make sure that
> the
> > > key
> > > >>> of
> > > >>> > the solution set and the key of the join is the same.
> > > >>> > That is hard to verify with general functions. One approach is to
> > > >>> actually
> > > >>> > change the delta iteration API to define the keys only at
> > > >>> > one place (the definition of the iteration), and offer special
> > > >>> > "joinWithSolution" and "coGroupWithSolution" functions, rather
> then
> > > >>> using
> > > >>> > the regular join syntax (which allows to create invalid
> > constructs).
> > > >>> >
> > > >>> > What are your thoughts on this, from a DeltaIteration user
> > > perspective?
> > > >>> >
> > > >>> > Greetings,
> > > >>> > Stephan
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > >
> >
>

Reply via email to