Thanks everyone for the feedback. I will maintain the design/requirements document at https://community.jboss.org/wiki/Infinispan60-MapReduceEnhancements
If anything remains unclear do tell me. Keep the feedback coming. Regards, Vladimir P.S I will also create another document that is related to enhancements to both distributed executors and map reduce - mainly fail-over, topology awareness, task interruption/cancellation etc On 12-02-14 10:19 AM, Vladimir Blagojevic wrote: > Hey guys, > > Before moving forward with next iteration of map reduce I wanted to hear > your thoughts about the following proposal. After we agree on the > general direction I will transcribe the agreed design on a wiki page and > start implementation. > > > Shortcoming of current map reduce implementation > > While our current map reduce implementation is more than a proof of a > concept there are several drawbacks preventing it from being an > industrial grade map reduce solution. The main drawback is the inability > of the current solution to deal with a large data (in GB/TB) map reduce > problems. This shortcoming is mainly around our reduce phase execution. > Reduce phase, as you might know, is currently done on a single > Infinispan master task node; reduce phase of map reduce problems we can > support (data size wise) is therefore shrunk to a working memory of a > single node. > > > Proposed solution > > The proposed solution involves distributing execution of reduce phase > tasks across the cluster thus effectively achieving higher reduce task > parallelization and at the same time removing the above mentioned reduce > phase restriction. Effectively leveraging our consistent hashing > solution even further we can parallelize reduce phase and elevate our > map reduce solution to an industrial level. Here is how we can achieve that. > > Map phase > > MapReduceTask, as it currently does, will hash task input keys and group > them by execution node N they are hashed to. For each node N and its > grouped input KIn keys MapReduceTask creates a MapCombineCommand which > is migrated to an execution target node N. MapCombineCommand is similar > to current MapReduceCommand. MapCombineCommand takes an instance of a > Mapper and an instance of a Reducer, which is a combiner [1]. > > Once loaded into target execution node MapCombineCommand takes each > local KIn key and executes Mapper method void map(KIn key, VIn value, > Collector<KOut, VOut> collector). Results are collected to a common > Collector<KOut, VOut> collector and combine phase is initiated. A > Combiner, if specified, takes KOut keys and imediatelly invokes reduce > phase on keys. The result of mapping phase executed on each node is > <KOut, VOut> map M. There will be one resulting M map per execution node N. > > At the end of combine phase instead of returning map M to the master > task node (as we currently do), we now hash each KOut in map M and group > KOut keys by the execution node N they are hashed to. Each group of KOut > keys and its VOut values, hashed to the same node, is wrapped with a new > command Migrate. Command Migrate, which is very similar to > PutKeyValueCommand,executed on Infinispan target node N esentially > maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all > executed MapCombineCommands will be collocated on a node N where KOut is > hashed to and value for KOut will be a list of all VOut values. We > essentially collect all VOut values under each KOut for all executed > MapCombineCommands. > > > At this point MapCombineCommand has finished its execution; list of KOut > keys is returned to a master node and its MapReduceTask. We do not > return VOut values as we do not need them at master task node. > MapReduceTask is ready to start with reduce phase. > > > Reduce phase > > > MapReduceTask initializes ReduceCommand with a user specified Reducer. > For each key KOut collected from a map phase we group them by execution > node N they are hashed to. For each node N and its grouped input KOut > keys MapReduceTask creates a ReduceCommand and sends it to a node N > where KOut keys are hashed. Once loaded on target execution node, > ReduceCommand for each KOut key grabs list of values VOut and invokes: > VOut reduce(KOut reducedKey, Iterator<VOut> iter). > > A result of ReduceCommand is a map M where each key is KOut and value is > VOut. Each Infinispan execution node N returns one map M where each key > KOut is hashed to N and each VOut is KOut's reduced value. > > When all ReduceCommands return to a calling node, MapReduceTask simply > combines all these M maps and returns final Map<KOut, VOut> as a result > of MapReduceTask. All intermediate KOut->List<VOut> maps left on > Infinispan cluster are then cleaned up. > > > [1] See section 4.3 of http://research.google.com/archive/mapreduce.html > _______________________________________________ > infinispan-dev mailing list > [email protected] > https://lists.jboss.org/mailman/listinfo/infinispan-dev _______________________________________________ infinispan-dev mailing list [email protected] https://lists.jboss.org/mailman/listinfo/infinispan-dev
