Hi Matt, my view is that if you use Infinispan to distribute tasks to be processed (master-slave processing) to workers, then Infinispan is the wrong tech for this.
Assuming I understood what you wrote below, you're distributing tasks to be executed to nodes identified by the consistent hash over the task data you insert into the cache. Workers get a notification when tasks are added to their cache, process them and remove the tasks when done. This is a system where you have 90% writes (task insertions and removals) and 10% reads (peeking into the cache to see if tasks are available). Infinispan was designed to be a cache for frequently used data, in which reads outnumber writes (perhaps 80%/20% w/r), so with your task distribution you use Infinispan in a way that's opposite to what it was designed for. It will cerianly work, but I don't think you'll get great perf. I've seen similar abuses before where people used Infinispan/JBossCache as a notification bus. For the scenario you describe below I recommend a messaging system like JGroups or JMS. I've implemented a peer-to-peer task distribution system [1] in JGroups before, take a look, perhaps this is closer to what you have in kind... [1] http://www.jgroups.org/taskdistribution.html On 3/2/13 6:40 PM, matt hoffman wrote: > Hey guys, > > I've been working on a prototype of integrating Infinispan into our > app. We do a lot of distributed processing across a small cluster, so > I've played with Infinispan's existing distributed execution framework > (which is nice), as well as using Infinispan alongside a normal message > queue to distribute tasks. But I've also put together a prototype of a > new distributed execution framework using fork-join pools that you all > might be interested in. If it sounds like something that would be > worthwhile for Infinispan, I can raise a Jira and submit a pull request > with what I have so far. I'd need to get the CA and company policy stuff > finalized; that might take a couple days. Meanwhile, in case there is > any interest, I've described the approach I've taken below. > > First, a little background: > > A while back I worked on a side project that integrated a distributed > work-stealing algorithm into the standard JDK fork-join queue. It used > JGroups for communication, because it was quick and easy for > prototyping. So this week I thought i'd take a stab at porting that over > to Infinispan. The algorithm I came up with for Infinispan is a bit less > of a work-stealing algorithm, to take advantage of Infinispan's built-in > distribution capabilities, but I think it's still fairly efficient. > > My basic approach was to take in a cache in the constructor, much like > the existing distributed executor, and then create a parallel, DIST-mode > cache that uses the same hash & grouping configuration as the original > cache. That new parallel cache is the "task cache", and we use that to > distribute available tasks across the cluster. It's a distributed cache > so that tasks are partitioned across a large cluster, and it uses the > hashing config of the original cache and a KeyAffinityService to attempt > to distribute the tasks to the same nodes that contain the data being > worked on. Nodes use cache listeners to be notified when there is new > work available, and the atomic replace() to "check out" the tasks for > execution, and "check in" the results. > > The basic algorithm is something like this: > > For a refresher, a normal FJ pool has a fork() method that takes in a > task, and then places that task on an internal queue (actually, one of > several queues). When threads are idle, they look to the nearest work > queue for work. If that work queue does not have work, they "steal" work > from another thread's queue. So in the best case, tasks remain on the > same thread as the task that spawned them, so tasks that process the > same data as their parents may still have that data in the CPU's cache, > etc. There's more to it than that, but that's the basic idea. > > This distributed algorithm just adds an extra layer on top for tasks > that are marked "distributable" (by extending DistributedFJTask instead > of the normal ForkJoinTask). When you call fork() with a > DistributedFJTask, it first checks to see if the local pool's work queue > is empty. If so, we just go ahead and submit it locally; there's no > reason to distribute it. If not, we put the task in the task cache, and > let Infinispan distribute it. When a node has no more work to do in its > internal fork-join queues, it looks at the task cache and tries to pull > work from there. > > So, it isn't really a "work-stealing" algorithm, per se; the > distributable tasks are being distributed eagerly using Infinispan's > normal cache distribution. But I'm hoping that doing that also makes it > easier to handle node failure, since nodes collectively share a common > picture of the work to be done. > > This approach required one change to the actual FJ classes themselves > (in org.infinispan.util.concurrent.jdk8backported). That's probably the > most controversial change. I had to make the original ForkJoinTask's > fork() method non-final in order to extend it cleanly. There's probably > a way around that, but that's the cleanest option I have thought of thus > far. > > And lastly, it's not done yet: basic task distribution is working, but I > haven't tackled failover to any real extent yet. The biggest questions, > though, are around what to do with the existing distributed execution > interfaces. For example, DistributedTask has a getCallable() method > because it assumes it's wrapping a Callable. But ForkJoinTasks don't > extend Callable. I could put in a shim to wrap the DistributedFJTasks > into Callables for the sake of that method, but I don't know if it's > worth it. Similarly, the DistributedExecutorService interface exposes a > lot of submit-to-specific-address or submit-to-all-addresses methods, > which are an odd fit here since tasks are distributed via their own > cache. Even if I used a KeyAffinityService to target the task to the > given Address, it might get picked up by another node that shares that > same hash. But I can add in a direct-to-single-Address capability in if > that seems worthwhile. Alternately, I can just use entirely different > interfaces (DistributedFJExecutorService, DistributedFJTask?). > > Thoughts? Concerns? Glaring issues? > > > > _______________________________________________ > infinispan-dev mailing list > [email protected] > https://lists.jboss.org/mailman/listinfo/infinispan-dev > -- Bela Ban, JGroups lead (http://www.jgroups.org) _______________________________________________ infinispan-dev mailing list [email protected] https://lists.jboss.org/mailman/listinfo/infinispan-dev
