On 13 Oct 2014, at 10:45, Dan Berindei <[email protected]> wrote:
>
> On Fri, Oct 10, 2014 at 6:49 PM, Emmanuel Bernard <[email protected]>
> wrote:
> When wrestling with the subject, here is what I had in mind.
>
> The M/R coordinator node sends the M task per segment on the node where
> the segment is primary.
>
> What's M? Is it just a shorthand for "map", or is it a new parameter that
> controls the number of map/combine tasks sent at once?
M is short for Map. Sorry.
>
> Each "per-segment" M task is executed and is offered the way to push
> intermediary results in a temp cache.
>
> Just to be clear, the user-provided mapper and combiner don't know anything
> about the intermediary cache (which doesn't have to be temporary, if it's
> shared by all M/R tasks). They only interact with the Collector interface.
> The map/combine task on the other hand is our code, and it deals with the
> intermediary cache directly.
Interesting, Evangelos, do you actually use the collector interface or actual
explicit intermediary caches in your approach.
If that’s the collector interface, I guess that’s easier to hide that sharding
business.
>
> The intermediary results are stored with a composite key [imtermKey-i, seg-j].
> The M/R coordinator waits for all M tasks to return. If one does not
> (timeout, rehash), the following happens:
>
> We can't allow time out map tasks, or they will keep writing to the
> intermediate cache in parallel with the retried tasks. So the originator has
> to wait for a response from each node to which it sent a map task.
OK. I guess the originator can see that a node is out of the cluster though and
act accordingly.
>
> - delete [intermKey-i, seg-i] (that operation could be handled by the
> new per-segment M before the map task is effectively started)
> - ship the M task for that segment-i to the new primary owner of
> segment-i
>
> When all M tasks are received the Reduce phase will read all [intermKey-i, *]
> keys and reduce them.
> Note that if the reduction phase is itself distributed, we could apply
> the same key per segment and shipping split for these.
>
> Sure, we have to retry reduce tasks when the primary owner changes, and it
> makes sense to retry as little as possible.
>
>
> Again the tricky part is to expose the ability to write to intermediary
> caches per segment without exposing segments per se as well as let
> someone see a concatenated view if intermKey-i from all segments subkeys
> during reduction.
>
> Writing to and reading from the intermediate cache is already abstracted from
> user code (in the Mapper and Reducer interfaces). So we don't need to worry
> about exposing extra details to the user.
>
>
> Thoughts?
>
> Dan, I did not quite get what alternative approach you wanted to
> propose. Care to respin it for a slow brain? :)
>
> I think where we differ is that I don't think user code needs to know about
> how we store the intermediate values and what we retry, as long as their
> mappers/combiners/reducers don't have side effects.
Right but my understanding from the LEADS guys was that they had side effects
on their M/Rs. Waiting for Evangelos to speak up.
>
> Otherwise I was thinking on the same lines: send 1 map/combine task for each
> segment (maybe with a cap on the number of segments being processed at the
> same time on each node), split the intermediate values per input segment,
> cancel+retry each map task if the topology changes and the executing node is
> no longer an owner. If the reduce phase is distributed, run 1 reduce task per
> segment as well, and cancel+retry the reduce task if the executing node is no
> longer an owner.
>
> I had some ideas about assigning each map/combine phase a UUID and making the
> intermediate keys [intermKey, seg, mctask] to allow the originator to retry a
> map/combine task without waiting for the previous one to finish, but I don't
> think I mentioned that before :)
Nice touch, that fixes the rogue node / timeout problem.
> There are also some details that I'm worried about:
>
> 1) If the reduce phase is distributed, and the intermediate cache is
> non-transactional, any topology change in the intermediate cache will require
> us to retry all the map/combine tasks that were running at the time on any
> node (even if some nodes did not detect the topology change yet). So it would
> make sense to limit the number of map/combine tasks that are processed at one
> time, in order to limit the amount of tasks we retry (OR require the
> intermediate cache to be transactional).
I am not fully following that. What matters in the end it seems is for the
originator to detect a topology change and discard things accordingly, no? If
the other nodes are slaves of that originator for the purpose of that M/R, we
are good.
>
> 2) Running a separate map/combine task for each segment is not really an
> option until we implement the the segment-aware data container and cache
> stores. Without that change, it will make everything much slower, because of
> all the extra iterations for each segment.
>
See my other email about physically merging down the per segment work into a
per node work when you ship that work.
> 3) And finally, all this will be overkill when the input cache is small, and
> the time needed to process the data is comparable to the time needed to send
> all those extra RPCs.
>
> So I'm thinking it might be better to adopt Vladimir's suggestion to retry
> everything if we detect a topology change in the input and/or intermediate
> cache at the end of the M/R task, at least in the first phase.
You half lost but I think that with my proposal to physically merge the RPC
calls per node instead of per segment, that problem would be alleviated.
Emmanuel
_______________________________________________
infinispan-dev mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/infinispan-dev