On Mon, Feb 24, 2014 at 10:55 PM, Vladimir Blagojevic <[email protected]>wrote:
> See inline > On 2/24/2014, 12:57 PM, Mircea Markus wrote: > > On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic <[email protected]> > wrote: > > > >> Hey guys, > >> > >> As some of you might know we have received additional requirements from > >> community and internally to add a few things to dist.executors and > >> map/reduce API. On distributed executors front we need to enable > >> distributed executors to store results into cache directly rather than > >> returning them to invoker [1]. As soon as we introduce this API we also > >> need a asyc. mechanism to allow notifications of subtask > >> completion/failure. > > I think we need both in at the same time :-) > Yes, that is what I actually meant. Poor wording. > Do we really need special support for distributed tasks to write results to another cache? We already allow a task to do cache.getCacheManager().getCache("outputCache").put(k, v) > > > >> I was thinking we add a concept of > >> DistributedTaskExecutionListener which can be specified in > >> DistributedTaskBuilder: > >> > >> DistributedTaskBuilder<T> > >> executionListener(DistributedTaskExecutionListener<K, T> listener); > >> > >> > >> We needed DistributedTaskExecutionListener anyway. All distributed tasks > >> might use some feedback about task progress, completion/failure and on. > >> My proposal is roughly: > >> > >> > >> public interface DistributedTaskExecutionListener<K, T> { > >> > >> void subtaskSent(Address node, Set<K> inputKeys); > >> void subtaskFailed(Address node, Set<K> inputKeys, Exception e); > >> void subtaskSucceded(Address node, Set<K> inputKeys, T result); > >> void allSubtasksCompleted(); > >> > >> } > >> > >> So much for that. > > I think this it would make sense to add this logic for monitoring, + > additional info such as average execution time etc. I'm not sure if this is > a generally useful API though, unless there were people asking for it > already? > Ok, noted. If you remember any references about this let me know and > I'll incorporate what people actually asked for rather than guess. > Ok, let's wait until we get some actual requests from users then. TBH I don't think distributed tasks with subtasks are something that users care about. E.g. with Map/Reduce the reduce tasks are not subtasks of the map/combine tasks, so this API wouldn't help. Hadoop has a Reporter interface that allows you to report "ticks" and increment counters, maybe we should add something like that instead? > > > > >> If tasks do not use input keys these parameters would > >> be emply sets. Now for [1] we need to add additional methods to > >> DistributedExecutorService. We can not specify result cache in > >> DistributedTaskBuilder as we are still bound to only submit methods in > >> DistributedExecutorService that return futures and we don't want that. > >> We need two new void methods: > >> > >> <T, K> void submitEverywhere(DistributedTask<T> task, > >> Cache<DistExecResultKey<K>, T> result); > >> <T, K > void submitEverywhere(DistributedTask<T> task, > >> Cache<DistExecResultKey<K>, T> result, K... input); > >> > >> > >> Now, why bother with DistExecResultKey? Well we have tasks that use > >> input keys and tasks that don't. So results cache could only be keyed by > >> either keys or execution address, or combination of those two. > >> Therefore, DistExecResultKey could be something like: > >> > >> public interface DistExecResultKey<K> { > >> > >> Address getExecutionAddress(); > >> K getKey(); > >> > >> } > >> > >> If you have a better idea how to address this aspect let us know. So > >> much for distributed executors. > >> > I think we should allow each distributed task to deal with output in its own way, the existing API should be enough. > >> > >> For map/reduce we also have to enable storing of map reduce task results > >> into cache [2] and allow users to specify custom cache for intermediate > >> results[3]. Part of task [2] is to allow notification about map/reduce > >> task progress and completion. Just as in dist.executor I would add > >> MapReduceTaskExecutionListener interface: > >> > >> > >> public interface MapReduceTaskExecutionListener { > >> > >> void mapTaskInitialized(Address executionAddress); > >> void mapTaskSucceeded(Address executionAddress); > >> void mapTaskFailed(Address executionTarget, Exception cause); > >> void mapPhaseCompleted(); > >> > >> void reduceTaskInitialized(Address executionAddress); > >> void reduceTaskSucceeded(Address executionAddress); > >> void reduceTaskFailed(Address address, Exception cause); > >> void reducePhaseCompleted(); > >> > >> } > > IMO - in the first stage at leas - I would rather use a simpler > (Notifying)Future, on which the user can wait till the computation happens: > it's simpler and more aligned with the rest of our async API. > > > What do you mean? We already have futures in MapReduceTask API. This API > is more fine grained and allows monitoring/reporting of task progress. > Please clarify. > I'm not sure about the usefulness of an API like this either... if the intention is to allow the user to collect statistics about duration of various phases, then I think exposing the durations via MapReduceTasks would be better. > > >> while MapReduceTask would have an additional method: > >> > >> public void execute(Cache<KOut, VOut> resultsCache); > > you could overload it with cache name only method. > Yeah, good idea. Same for usingIntermediateCache? I actually asked you > this here https://issues.jboss.org/browse/ISPN-4021 > +1 to allow a cache name only. For the intermediate cache I don't think it makes sense to allow a Cache version at all.
_______________________________________________ infinispan-dev mailing list [email protected] https://lists.jboss.org/mailman/listinfo/infinispan-dev
