Very interesting stuff Matt!

I find the design that leverages on Infinispan for distributing the tasks - while maximizing locality and exploiting Infinispan's fault-tolerance capabilities - modular and elegant. It will be interesting to evaluate if there is a significant overhead with respect to a purely JGroups-based mechanism.

We will be looking at your code, and may pheraps come up with some use-cases.

Cheers,

    Paolo

On 5/9/13 10:52 AM, Manik Surtani wrote:
Hi Matt.

Thanks for resurrecting and sorry for not responding on the original thread.

Interesting discussions. I would intuitively agree with Bela were it not for the fact that, as you said, you're actually storing data, require high availability of that data during server failure, and may possibly chain tasks (as per a local fork-join) and want to maintain server affinity across related tasks.

You could achieve all of this with JGroups but would end up re-implementing certain bits of Infinispan, specifically:
- task failover
- task cancelling
- consistent hash based node selection for delivery, ensuring affinity across data and tasks

So I think it is a pretty valid use case to make use of what Infinispan already has.

I'll have a look at your sources and comment more. I'm sure Paolo and others at CloudTM will probably do the same.

Cheers
Manik

On 9 May 2013, at 01:50, Matt Hoffman <[email protected] <mailto:[email protected]>> wrote:

Resurrecting this topic: I've put some sample code on my fork here: https://github.com/matthoffman/infinispan/tree/dfj There's a README there that offers an overview. I hope to do some performance comparisons over the weekend, but in the meantime, the code is there if you're curious.

I'm mainly interested whether this is the type of thing that Infinispan may be interested in, as an alternative to the current distributed executor (much like the ForkJoinPool in the JDK is an alternative to a traditional thread pool). If so, the next steps would be to do some performance tests, pick one implementation to move forward with (out of the 3 prototypes on that branch) and clean it up sufficiently to be considered for a pull request.

If its not something that Infinispan is interested in, then I'll change approaches and make it more generic, so that it isn't Infinispan-specific and can be used with other transports. I'm open to either option... I think there are pros and cons either way.


Thanks!


matt



On Mon, Mar 4, 2013 at 7:07 AM, Paolo Romano <[email protected] <mailto:[email protected]>> wrote:

    This sounds really interesting Matt. In the Cloud-TM project
    (www.cloudtm.eu <http://www.cloudtm.eu/>) we are currently
    developing a parallel graph-analysis algorithm on top of
    Infinispan's DEF. I would be really curious to take a look at the
    framework you developed, and see how it could be exploited in our
    application.

    Regards,

        Paolo


--
    Paolo Romano, PhD
    Coordinator of the Cloud-TM ICT FP7 Project (www.cloudtm.eu  
<http://www.cloudtm.eu/>)
    Senior Researcher @ INESC-ID (www.inesc-id.pt  <http://www.inesc-id.pt/>)
    Assistant Professor @ Instituto Superior Tecnico (www.ist.utl.pt  
<http://www.ist.utl.pt/>)
    Rua Alves Redol, 9
    1000-059, Lisbon Portugal
    Tel.+ 351 21 3100300  <tel:%2B%20351%2021%203100300>
    Fax+ 351 21 3145843  <tel:%2B%20351%2021%203145843>
    Webpagehttp://www.gsd.inesc-id.pt/~romanop  
<http://www.gsd.inesc-id.pt/%7Eromanop>



    On 3/2/13 5:40 PM, matt hoffman wrote:

    Hey guys,

    I've been working on a prototype of integrating Infinispan into
    our app.  We do a lot of distributed processing across a small
    cluster, so I've played with Infinispan's existing distributed
    execution framework (which is nice), as well as using Infinispan
    alongside a normal message queue to distribute tasks.  But I've
    also put together a prototype of a new distributed execution
    framework using fork-join pools that you all might be interested
    in.  If it sounds like something that would be worthwhile for
    Infinispan, I can raise a Jira and submit a pull request with
    what I have so far. I'd need to get the CA and company policy
    stuff finalized; that might take a couple days. Meanwhile, in
    case there is any interest, I've described the approach I've
    taken below.

    First, a little background:

    A while back I worked on a side project that integrated a
    distributed work-stealing algorithm into the standard JDK
    fork-join queue.  It used JGroups for communication, because it
    was quick and easy for prototyping. So this week I thought i'd
    take a stab at porting that over to Infinispan. The algorithm I
    came up with for Infinispan is a bit less of a work-stealing
    algorithm, to take advantage of Infinispan's built-in
    distribution capabilities, but I think it's still fairly efficient.

    My basic approach was to take in a cache in the constructor,
    much like the existing distributed executor, and then create a
    parallel, DIST-mode cache that uses the same hash & grouping
    configuration as the original cache. That new parallel cache is
    the "task cache", and we use that to distribute available tasks
    across the cluster. It's a distributed cache so that tasks are
    partitioned across a large cluster, and it uses the hashing
    config of the original cache and a KeyAffinityService to attempt
    to distribute the tasks to the same nodes that contain the data
    being worked on. Nodes use cache listeners to be notified when
    there is new work available, and the atomic replace() to "check
    out" the tasks for execution, and "check in" the results.

    The basic algorithm is something like this:

    For a refresher, a normal FJ pool has a fork() method that takes
    in a task, and then places that task on an internal queue
    (actually, one of several queues). When threads are idle, they
    look to the nearest work queue for work. If that work queue does
not have work, they "steal" work from another thread's queue. So in the best case, tasks remain on the same thread as the task
    that spawned them, so tasks that process the same data as their
    parents may still have that data in the CPU's cache, etc.
    There's more to it than that, but that's the basic idea.

    This distributed algorithm just adds an extra layer on top for
    tasks that are marked "distributable" (by extending
    DistributedFJTask instead of the normal ForkJoinTask). When you
    call fork() with a DistributedFJTask, it first checks to see if
    the local pool's work queue is empty. If so, we just go ahead
    and submit it locally; there's no reason to distribute it. If
    not, we put the task in the task cache, and let Infinispan
    distribute it. When a node has no more work to do in its
    internal fork-join queues, it looks at the task cache and tries
    to pull work from there.

    So, it isn't really a "work-stealing" algorithm, per se; the
    distributable tasks are being distributed eagerly using
    Infinispan's normal cache distribution. But I'm hoping that
    doing that also makes it easier to handle node failure, since
    nodes collectively share a common picture of the work to be done.

    This approach required one change to the actual FJ classes
    themselves (in org.infinispan.util.concurrent.jdk8backported).
    That's probably the most controversial change. I had to make the
    original ForkJoinTask's fork() method non-final in order to
    extend it cleanly. There's probably a way around that, but
    that's the cleanest option I have thought of thus far.

    And lastly, it's not done yet: basic task distribution is
    working, but I haven't tackled failover to any real extent yet.
    The biggest questions, though, are around what to do with the
    existing distributed execution interfaces. For example,
    DistributedTask has a getCallable() method because it assumes
    it's wrapping a Callable. But ForkJoinTasks don't extend
    Callable. I could put in a shim to wrap the DistributedFJTasks
    into Callables for the sake of that method, but I don't know if
    it's worth it.  Similarly, the DistributedExecutorService
    interface exposes a lot of submit-to-specific-address or
    submit-to-all-addresses methods, which are an odd fit here since
    tasks are distributed via their own cache.  Even if I used a
    KeyAffinityService to target the task to the given Address, it
    might get picked up by another node that shares that same
    hash. But I can add in a direct-to-single-Address capability in
    if that seems worthwhile. Alternately, I can just use entirely
    different interfaces (DistributedFJExecutorService,
    DistributedFJTask?).

    Thoughts?  Concerns?  Glaring issues?



    _______________________________________________
    infinispan-dev mailing list
    [email protected]  <mailto:[email protected]>
    https://lists.jboss.org/mailman/listinfo/infinispan-dev


    _______________________________________________
    infinispan-dev mailing list
    [email protected]
    <mailto:[email protected]>
    https://lists.jboss.org/mailman/listinfo/infinispan-dev


_______________________________________________
infinispan-dev mailing list
[email protected] <mailto:[email protected]>
https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
[email protected] <mailto:[email protected]>
twitter.com/maniksurtani <http://twitter.com/maniksurtani>

Platform Architect, JBoss Data Grid
http://red.ht/data-grid



_______________________________________________
infinispan-dev mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

_______________________________________________
infinispan-dev mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/infinispan-dev

Reply via email to