Hi Matt.

Thanks for resurrecting and sorry for not responding on the original thread.

Interesting discussions.  I would intuitively agree with Bela were it not for 
the fact that, as you said, you're actually storing data, require high 
availability of that data during server failure, and may possibly chain tasks 
(as per a local fork-join) and want to maintain server affinity across related 
tasks.

You could achieve all of this with JGroups but would end up re-implementing 
certain bits of Infinispan, specifically:
- task failover
- task cancelling
- consistent hash based node selection for delivery, ensuring affinity across 
data and tasks

So I think it is a pretty valid use case to make use of what Infinispan already 
has.

I'll have a look at your sources and comment more.  I'm sure Paolo and others 
at CloudTM will probably do the same.

Cheers
Manik

On 9 May 2013, at 01:50, Matt Hoffman <[email protected]> wrote:

> Resurrecting this topic:  I've put some sample code on my fork here:  
> https://github.com/matthoffman/infinispan/tree/dfj
> There's a README there that offers an overview. I hope to do some performance 
> comparisons over the weekend, but in the meantime, the code is there if 
> you're curious. 
> 
> I'm mainly interested whether this is the type of thing that Infinispan may 
> be interested in, as an alternative to the current distributed executor (much 
> like the ForkJoinPool in the JDK is an alternative to a traditional thread 
> pool). 
> If so, the next steps would be to do some performance tests, pick one 
> implementation to move forward with (out of the 3 prototypes on that branch) 
> and clean it up sufficiently to be considered for a pull request. 
> 
> If its not something that Infinispan is interested in, then I'll change 
> approaches and make it more generic, so that it isn't Infinispan-specific and 
> can be used with other transports. I'm open to either option... I think there 
> are pros and cons either way. 
> 
> 
> Thanks!
> 
> 
> matt
> 
> 
> 
> On Mon, Mar 4, 2013 at 7:07 AM, Paolo Romano <[email protected]> wrote:
> This sounds really interesting Matt. In the Cloud-TM project (www.cloudtm.eu) 
> we are currently developing a parallel graph-analysis algorithm on top of 
> Infinispan's DEF. I would be really curious to take a look at the framework 
> you developed, and see how it could be exploited in our application.
> 
> Regards,
> 
>     Paolo
> 
> 
> -- 
> 
> Paolo Romano, PhD
> Coordinator of the Cloud-TM ICT FP7 Project (www.cloudtm.eu)
> Senior Researcher @ INESC-ID (www.inesc-id.pt)
> Assistant Professor @ Instituto Superior Tecnico (www.ist.utl.pt)
> Rua Alves Redol, 9
> 1000-059, Lisbon Portugal
> Tel. + 351 21 3100300
> Fax  + 351 21 3145843
> Webpage http://www.gsd.inesc-id.pt/~romanop
> 
> 
> On 3/2/13 5:40 PM, matt hoffman wrote:
>> Hey guys, 
>> 
>> I've been working on a prototype of integrating Infinispan into our app.  We 
>> do a lot of distributed processing across a small cluster, so I've played 
>> with Infinispan's existing distributed execution framework (which is nice), 
>> as well as using Infinispan alongside a normal message queue to distribute 
>> tasks.  But I've also put together a prototype of a new distributed 
>> execution framework using fork-join pools that you all might be interested 
>> in.  If it sounds like something that would be worthwhile for Infinispan, I 
>> can raise a Jira and submit a pull request with what I have so far. I'd need 
>> to get the CA and company policy stuff finalized; that might take a couple 
>> days. Meanwhile, in case there is any interest, I've described the approach 
>> I've taken below.
>> 
>> First, a little background:
>> 
>> A while back I worked on a side project that integrated a distributed 
>> work-stealing algorithm into the standard JDK fork-join queue.  It used 
>> JGroups for communication, because it was quick and easy for prototyping. So 
>> this week I thought i'd take a stab at porting that over to Infinispan. The 
>> algorithm I came up with for Infinispan is a bit less of a work-stealing 
>> algorithm, to take advantage of Infinispan's built-in distribution 
>> capabilities, but I think it's still fairly efficient. 
>> 
>> My basic approach was to take in a cache in the constructor, much like the 
>> existing distributed executor, and then create a parallel, DIST-mode cache 
>> that uses the same hash & grouping configuration as the original cache. That 
>> new parallel cache is the "task cache", and we use that to distribute 
>> available tasks across the cluster. It's a distributed cache so that tasks 
>> are partitioned across a large cluster, and it uses the hashing config of 
>> the original cache and a KeyAffinityService to attempt to distribute the 
>> tasks to the same nodes that contain the data being worked on. Nodes use 
>> cache listeners to be notified when there is new work available, and the 
>> atomic replace() to "check out" the tasks for execution, and "check in" the 
>> results.   
>> 
>> The basic algorithm is something like this: 
>> 
>> For a refresher, a normal FJ pool has a fork() method that takes in a task, 
>> and then places that task on an internal queue (actually, one of several 
>> queues). When threads are idle, they look to the nearest work queue for 
>> work. If that work queue does not have work, they "steal" work from another 
>> thread's queue.  So in the best case, tasks remain on the same thread as the 
>> task that spawned them, so tasks that process the same data as their parents 
>> may still have that data in the CPU's cache, etc. There's more to it than 
>> that, but that's the basic idea.  
>> 
>> This distributed algorithm just adds an extra layer on top for tasks that 
>> are marked "distributable" (by extending DistributedFJTask instead of the 
>> normal ForkJoinTask). When you call fork() with a DistributedFJTask, it 
>> first checks to see if the local pool's work queue is empty. If so, we just 
>> go ahead and submit it locally; there's no reason to distribute it. If not, 
>> we put the task in the task cache, and let Infinispan distribute it. When a 
>> node has no more work to do in its internal fork-join queues, it looks at 
>> the task cache and tries to pull work from there. 
>> 
>> So, it isn't really a "work-stealing" algorithm, per se; the distributable 
>> tasks are being distributed eagerly using Infinispan's normal cache 
>> distribution. But I'm hoping that doing that also makes it easier to handle 
>> node failure, since nodes collectively share a common picture of the work to 
>> be done.
>> 
>> This approach required one change to the actual FJ classes themselves (in 
>> org.infinispan.util.concurrent.jdk8backported). That's probably the most 
>> controversial change. I had to make the original ForkJoinTask's fork() 
>> method non-final in order to extend it cleanly.  There's probably a way 
>> around that, but that's the cleanest option I have thought of thus far. 
>> 
>> And lastly, it's not done yet: basic task distribution is working, but I 
>> haven't tackled failover to any real extent yet. The biggest questions, 
>> though, are around what to do with the existing distributed execution 
>> interfaces. For example, DistributedTask has a getCallable() method because 
>> it assumes it's wrapping a Callable. But ForkJoinTasks don't extend 
>> Callable. I could put in a shim to wrap the DistributedFJTasks into 
>> Callables for the sake of that method, but I don't know if it's worth it.  
>> Similarly, the DistributedExecutorService interface exposes a lot of 
>> submit-to-specific-address or submit-to-all-addresses methods, which are an 
>> odd fit here since tasks are distributed via their own cache.  Even if I 
>> used a KeyAffinityService to target the task to the given Address, it might 
>> get picked up by another node that shares that same hash. But I can add in a 
>> direct-to-single-Address capability in if that seems worthwhile. 
>> Alternately, I can just use entirely different interfaces 
>> (DistributedFJExecutorService, DistributedFJTask?). 
>> 
>> Thoughts?  Concerns?  Glaring issues? 
>> 
>> 
>> 
>> _______________________________________________
>> infinispan-dev mailing list
>> [email protected]
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> 
> 
> _______________________________________________
> infinispan-dev mailing list
> [email protected]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> 
> _______________________________________________
> infinispan-dev mailing list
> [email protected]
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[email protected]
twitter.com/maniksurtani

Platform Architect, JBoss Data Grid
http://red.ht/data-grid

_______________________________________________
infinispan-dev mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

Reply via email to