Hi, I've created issues for (1) <https://issues.apache.org/jira/browse/FLINK-1091> and (2) <https://issues.apache.org/jira/browse/FLINK-1092>. Regarding (3), I'm not quite sure how it should be done or whether it should be part of a more general iteration type, so I'd like to think about it a bit more. Any thoughts and comments will be much appreciated :)
Cheers, V. On 4 September 2014 21:07, Fabian Hueske <fhue...@apache.org> wrote: > Hi Vasia, > > thanks for the nice wrap-up :-) > All of these would be very valuable enhancements of the system, IMO. > > I'd suggest you create JIRAs for the individual points. > It would be very cool, if you'd pick up some of the tasks. :-) > > Cheers, Fabian > > > 2014-09-04 20:43 GMT+02:00 Vasiliki Kalavri <vasilikikala...@gmail.com>: > > > Hello all, > > > > thank you Stephan and Aljoscha for the great input! > > > > Regarding my initial issue in this e-mail, I have solved it a while ago, > by > > (1) flattening the input before the join and (2) realizing that there was > > an easier way to implement my algorithm than what I was trying to do o.O > > > > However, several interesting discussion points came out of this, so let > me > > try to summarize: > > > > (1) Support joining the solution set, using key selectors. > > This is the initial problem that started this discussion. As Stephan > > explains above, a possible solution can be providing explicit functions > > "joinWithSolution" and "coGroupWithSolution" to make sure the keys used > are > > valid. > > > > (2) Allow operations, other than join and cogroup, on the solution set. > > For this one, I would like to first understand whether restricting the > > operations to join and cogroup was a requirement for some reason (like > > disallowing writing invalid constructs) or whether you did not see the > use > > case for it. The solution here can be using special functions or > > constructs, like Aljoscha suggests. > > > > (3) Support types of iterations where the result does not have to be in a > > "solution set" form. An example is the one Aljoscha gives with outputing > > one element per iteration and concatenating them. I like this idea, but I > > think this requires a bit more thought and should be something more > > general. > > > > Do you have any other related issues to add? > > > > Personally, I'm interested in all of the above and I believe I can > dedicate > > some time to work on them :) > > So, what do you think? > > > > Cheers, > > V. > > > > > > On 31 August 2014 08:54, Aljoscha Krettek <aljos...@apache.org> wrote: > > > > > HI Guys, > > > sorry for being so late in this discussion but I was thinking about > > > how we could make iterations more intuitive. My Idea would be to make > > > access to the solution set explicit in Delta Iterations. It should be > > > usable as input for any operation and updates to it would be done > > > through a special construct, not by joining with it. Maybe we could > > > have a special UpdateSolution operator which would make sure that the > > > data is partitioned correctly for the distributed hash table in which > > > the solution is stored. Or we could have a function on the Iteration > > > Context that can update the solution from an arbitraty operator. In my > > > idea, Bulk Iteration would be the base operation and the other > > > Iterations would add special features to it. I also would like to add > > > an iteration type where you can output an element in every iteration > > > and the result is the concatenation of those elements. This seems to > > > be required for some machine-learning-style algorithms. > > > > > > What do you guys think about this? I'm not sure on the details here > > > since I'm busy with other low-level stuff (Generalizing Pair > > > Comparators and the Scala API) but I was planning to tackle this > > > afterwards. If someone else wanted to look into this I'd be happy to > > > help and discuss, though. :D > > > > > > Cheers, > > > Aljoscha > > > > > > On Sat, Aug 30, 2014 at 6:27 PM, Stephan Ewen <se...@apache.org> > wrote: > > > > Addendum: The issue to enforce partitioning of the data set is > tracked > > > > here: https://issues.apache.org/jira/browse/FLINK-1060 > > > > > > > > > > > > On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <se...@apache.org> > > wrote: > > > > > > > >> Hey! > > > >> > > > >> Sorry for the late reply, but here are some thoughts in that > > direction: > > > >> > > > >> > > > >> Stateful operations: > > > >> > > > >> In order to keep state (in the form of a hash map or so) around in > > > >> iterations, you need not use the restricted means that the delta > > > iterations > > > >> give you. All function instances stay around for all supersteps, so > > they > > > >> can actually just create a map inside that function and update it as > > you > > > >> like. > > > >> > > > >> There is some example code at the end of the mail (listing 1) or > > nicely > > > >> formatted) in this gist: > > > >> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 > > > >> > > > >> > > > >> State across functions: > > > >> > > > >> If you actually want to have state that you access from multiple > > > >> functions, that is also possible. Use a static broker to fetch the > > map, > > > >> like in listing 2 or this gist: > > > >> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 > > > >> > > > >> > > > >> Making sure data is partitioned: > > > >> > > > >> The more tricky part is making sure that the data is partitioned > when > > it > > > >> enters the functions. Right now, this needs a hack: A GroupReduce > > > function > > > >> that simply emits everything. This will cause an unnecessary and > > costly > > > >> sort, so we need to get that out of the way. Until then, it should > > allow > > > >> you to write a prototype. > > > >> > > > >> The good news is that we are planning to add hints to tell the > system > > to > > > >> partition/rebalance/etc data sets for operations. This would solve > the > > > >> issue nicely. It is not a terribly large change, so it should not > take > > > too > > > >> long. I'll keep you posted on the progress... > > > >> > > > >> Greetings, > > > >> Stephan > > > >> > > > >> > > > >> --------------------------------------------------------- > > > >> Sample 1: Flexible State in a Mapper > > > >> --------------------------------------------------------- > > > >> > > > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > > > Edge> { > > > >> > > > >> private final Map<Long, Edge> state = new HashMap<Long, > Edge>(); > > > >> > > > >> @Override > > > >> public void open(Configuration conf) throws Exception { > > > >> // load the state > > > >> > > > >> if (getIterationRuntimeContext().getSuperstepNumber() > == > > > 1) { > > > >> String pathToFragment > = > > > "hdfs://... or file://..."; > > > >> CsvInputFormat<Edge> reader = new > > > CsvInputFormat<>(new Path(pathToFragment)); > > > >> reader.configure(new Configuration()); > > > >> reader.open(new FileInputSplit(0, new > > > Path(pathToFragment), 0, 36584, null)); > > > >> while > > > (!reader.reachedEnd()) { > > > >> Edge next = reader.nextRecord(new > > Edge()); > > > >> state.put(next.f0, next); > > > >> } > > > >> } > > > >> > > > >> } > > > >> > > > >> @Override > > > >> > > > >> public void close() { > > > >> // check whether to write the state out > > > >> if (getIterationRuntimeContext().getSuperstepNumber() > == > > > 42) { > > > >> // write the state (similar code as in open() > > for > > > the reader) > > > >> > > > >> } > > > >> } > > > >> > > > >> > > > >> @Override > > > >> public void mapPartition(Iterable<Edge> records, > Collector<Edge> > > > out) throws Exception { > > > >> // do something with the state > > > >> for (Edge e : records) { > > > >> Edge other = state.get(e.f0); > > > >> // do something > > > >> > > > >> state.put(...); > > > >> > > > >> } > > > >> } > > > >> } > > > >> > > > >> > > > >> ======================================================= > > > >> > > > >> --------------------------------------------------------- > > > >> Sample 2: Sharing state across functions > > > >> --------------------------------------------------------- > > > >> > > > >> public class StateBroker { > > > >> > > > >> public static final ConcurrentHashMap<Integer, Map<Long, Edge>> > > > BROKER = new ConcurrentHashMap<>(); > > > >> public static Map<Long, Edge> getForSubtask(int > > subtask) { > > > >> Map<Long, Edge> entry = > > > BROKER.get(subtask); > > > >> if (entry == null) { > > > >> entry = new HashMap<>(); > > > >> Map<Long, Edge> previous = > > > BROKER.putIfAbsent(subtask, entry); > > > >> entry = previous == null ? entry : previous; > > > >> } > > > >> return entry; > > > >> } > > > >> } > > > >> > > > >> > > > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > > > Edge> { > > > >> > > > >> private Map<Long, Edge> state; > > > >> > > > >> @Override > > > >> public void open(Configuration conf) throws Exception { > > > >> // load the state > > > >> > > > >> if (getIterationRuntimeContext().getSuperstepNumber() > == > > > 1) { > > > >> state = > > > StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); > > > >> } > > > >> } > > > >> > > > >> ... > > > >> } > > > >> > > > >> > > > >> > > > >> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < > > > >> vasilikikala...@gmail.com> wrote: > > > >> > > > >>> Hey, > > > >>> > > > >>> thanks for replying so fast :) > > > >>> > > > >>> I saw the discussion in a previous thread concerning changing the > API > > > to > > > >>> offer more explicit join functions. > > > >>> I think providing these special functions is a good way to disable > > any > > > >>> other kind of interaction with the solution set. > > > >>> > > > >>> However, as a user, I would like to have a more flexible way of > > > >>> interacting > > > >>> with the state of the iteration. > > > >>> In the program I'm describing above, I actually want to join on the > > > value > > > >>> of the solution set, not the key. > > > >>> It would be nice to somehow have access to the solution set as any > > > other > > > >>> normal DataSet. > > > >>> > > > >>> I'm not sure how this could be supported, but if you think this is > a > > > good > > > >>> idea, I could work on this! > > > >>> > > > >>> Cheers, > > > >>> V. > > > >>> > > > >>> > > > >>> > > > >>> On 31 July 2014 15:50, Stephan Ewen <se...@apache.org> wrote: > > > >>> > > > >>> > Hi Vasia! > > > >>> > > > > >>> > There is no fundamental reason, we simply have not gotten around > to > > > >>> > implementing it, yet. Any help along these lines is highly > welcome. > > > >>> > > > > >>> > One reason that held us back is that we need to make sure that > the > > > key > > > >>> of > > > >>> > the solution set and the key of the join is the same. > > > >>> > That is hard to verify with general functions. One approach is to > > > >>> actually > > > >>> > change the delta iteration API to define the keys only at > > > >>> > one place (the definition of the iteration), and offer special > > > >>> > "joinWithSolution" and "coGroupWithSolution" functions, rather > then > > > >>> using > > > >>> > the regular join syntax (which allows to create invalid > > constructs). > > > >>> > > > > >>> > What are your thoughts on this, from a DeltaIteration user > > > perspective? > > > >>> > > > > >>> > Greetings, > > > >>> > Stephan > > > >>> > > > > >>> > > > >> > > > >> > > > > > >