Hey!

Sorry for the late reply, but here are some thoughts in that direction:


Stateful operations:

In order to keep state (in the form of a hash map or so) around in
iterations, you need not use the restricted means that the delta iterations
give you. All function instances stay around for all supersteps, so they
can actually just create a map inside that function and update it as you
like.

There is some example code at the end of the mail (listing 1) or nicely
formatted) in this gist:
https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09


State across functions:

If you actually want to have state that you access from multiple functions,
that is also possible. Use a static broker to fetch the map, like in
listing 2 or this gist:
https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328


Making sure data is partitioned:

The more tricky part is making sure that the data is partitioned when it
enters the functions. Right now, this needs a hack: A GroupReduce function
that simply emits everything. This will cause an unnecessary and costly
sort, so we need to get that out of the way. Until then, it should allow
you to write a prototype.

The good news is that we are planning to add hints to tell the system to
partition/rebalance/etc data sets for operations. This would solve the
issue nicely. It is not a terribly large change, so it should not take too
long. I'll keep you posted on the progress...

Greetings,
Stephan


---------------------------------------------------------
Sample 1: Flexible State in a Mapper
---------------------------------------------------------

public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> {

        private final Map<Long, Edge> state = new HashMap<Long, Edge>();

        @Override
        public void open(Configuration conf) throws Exception {
                                // load the state
                if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                                                String pathToFragment = 
"hdfs://... or file://...";
                        CsvInputFormat<Edge> reader = new CsvInputFormat<>(new 
Path(pathToFragment));
                        reader.configure(new Configuration());
                        reader.open(new FileInputSplit(0, new 
Path(pathToFragment), 0, 36584, null));
                                                while (!reader.reachedEnd()) {
                                Edge next = reader.nextRecord(new Edge());
                                state.put(next.f0, next);
                        }
                }
        }

        @Override
        public void close() {
                // check whether to write the state out
                if (getIterationRuntimeContext().getSuperstepNumber() == 42) {
                        // write the state (similar code as in open() for the 
reader)
                                        }
        }

        @Override
        public void mapPartition(Iterable<Edge> records, Collector<Edge> out)
throws Exception {
                // do something with the state
                for (Edge e : records) {
                        Edge other = state.get(e.f0);
                        // do something
                                                state.put(...);
                }
        }
}


=======================================================

---------------------------------------------------------
Sample 2: Sharing state across functions
---------------------------------------------------------

public class StateBroker {

  public static final ConcurrentHashMap<Integer, Map<Long, Edge>>
BROKER = new ConcurrentHashMap<>();
                public static Map<Long, Edge> getForSubtask(int subtask) {
                                Map<Long, Edge> entry = BROKER.get(subtask);
                if (entry == null) {
                        entry = new HashMap<>();
                        Map<Long, Edge> previous = BROKER.putIfAbsent(subtask, 
entry);
                        entry = previous == null ? entry : previous;
                }
                                return entry;
        }
}

public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> {

        private Map<Long, Edge> state;

        @Override
        public void open(Configuration conf) throws Exception {
                                // load the state
                if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                        state = 
StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask());
                }
        }
                ...
}



On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri <vasilikikala...@gmail.com
> wrote:

> Hey,
>
> thanks for replying so fast :)
>
> I saw the discussion in a previous thread concerning changing the API to
> offer more explicit join functions.
> I think providing these special functions is a good way to disable any
> other kind of interaction with the solution set.
>
> However, as a user, I would like to have a more flexible way of interacting
> with the state of the iteration.
> In the program I'm describing above, I actually want to join on the value
> of the solution set, not the key.
> It would be nice to somehow have access to the solution set as any other
> normal DataSet.
>
> I'm not sure how this could be supported, but if you think this is a good
> idea, I could work on this!
>
> Cheers,
> V.
>
>
>
> On 31 July 2014 15:50, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi Vasia!
> >
> > There is no fundamental reason, we simply have not gotten around to
> > implementing it, yet. Any help along these lines is highly welcome.
> >
> > One reason that held us back is that we need to make sure that the key of
> > the solution set and the key of the join is the same.
> > That is hard to verify with general functions. One approach is to
> actually
> > change the delta iteration API to define the keys only at
> > one place (the definition of the iteration), and offer special
> > "joinWithSolution" and "coGroupWithSolution" functions, rather then using
> > the regular join syntax (which allows to create invalid constructs).
> >
> > What are your thoughts on this, from a DeltaIteration user perspective?
> >
> > Greetings,
> > Stephan
> >
>

Reply via email to