Hi all,

following some of the above ideas I was able to implement a first,
non-clustered, version, showing the intention. (please see code below)
It would be awesome to receive direct feedback.

First, the approach of using a modified ExecutionContext did not work out.
Especially, I do not see a way to put the result of body:  => T into a
promise, while only having access to the execute method of the
ExecutionContext. Thus, i introduced a dfuture( ... body .... ) to create a
Runnable

   - that creates a promise,
   - passes that promise to the actor
   - and relies on the created caller actor to complete the promise.

With this approach however and because ExecutionContext did not work out,
the combination (in the sense of monadic operations like map / flatMap) of
distributed futures seems not possible, as dfuture( .... ) returns a
regular Future, so that subsequent map / flatMap operations again run
locally (instead of distributed). Here, a DistributedFuture class / object
would be reasonable in my option. Do you see any change to avoid this?

Second, passing a Function0 to an actor (neglecting the function context /
closure right now), must be elaborated. How (see code comments), can one
create an instance of Function0 on remote site, that is invoked by the
callee actor after instantiation? For the future, SIP-21 seems the way to
go.

Looking forward to your feedback,

Martin




import scala.concurrent.{Promise, Future, ExecutionContext}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import scala.util.Success
import scala.util.Failure
import akka.actor._

/**
 * Provides dfuture method to create a distributed future.
 * A distributed future is a future, which is run on an (remote) actor.
 *
 * @example {{{
 * // actorSystem must be given implicitely
 *
 * val sampleFuture1:Future[Int] = dfuture {
 *   for ( i <- 1 to 5000 ) {
 *     println("distributed_future_1")
 *   }
 *   42
 * }
 *
 * val sampleFuture2:Future[Int] = dfuture {
 *   for ( i <- 1 to 5000 ) {
 *     println("distributed_future_2")
 *   }
 *   43
 * }
 *
 * val combinedFuture = for {
 *   x1 <- sampleFuture1
 *   x2 <- sampleFuture2
 * } yield x1*x2
 * }}}
 */
object DistributedFuture {
  def dfuture[T](body: =>T)(implicit execctx: ExecutionContext, asystem:
ActorSystem): Future[T] = DistributedFutureImpl[T](body)
}

/**
 * Distributed future implementation.
 */
object DistributedFutureImpl {

  /**
   * Actor on caller site, which delegates Execute messages to freshly
created remote actors to trigger execution of given body.
   * On receive of Try[T], the underlying promise is completed.
   */
  class CallerActor[T](promise: Promise[T]) extends Actor with ActorLogging
{
    val callee = context.actorOf(Props[CalleeActor[T]])

    def receive = {
      case msg : Execute[T] ⇒ {
        log.debug("Calling callee with execute msg.")
        callee ! msg
      }
      case t : Try[T] ⇒ {
        log.debug("Caller received Try.")
        promise complete t

        log.debug("Caller will stop (and thus will also stop child
callee).")
        context.stop( self )
      }
    }
  }

  /**
   * Execute case class. Used by CallerActor and CalleeActor.
   * @param body is the code to execute on callee (remote node) site.
   */
  case class Execute[T](body: () => T)

  /**
   * An (remote) actor that executes a given body
   */
  class CalleeActor[T] extends Actor with ActorLogging {
    def receive = {
      case Execute(body:( () => T )) ⇒ {
        log.debug("Callee received execute.")
        try {
          val result:T = body()
          log.debug("Callee was successful with execution.")
          sender ! Success(result)
        } catch {
          case NonFatal(e) => {
            sender ! Failure(e)
          }
        }
      }
    }
  }

  /**
   * A runnable wrapping the execution of body, whose result is put the a
promise.
   *
   * Idea shamelessly stolen from concurrent.impl.Future
   *
   * @param body the body to execute
   * @tparam T return type of this distributed future.
   */
  class ActorCompletedPromiseRunnable[T](body: => T, system: ActorSystem)
extends Runnable {
    val promise: Promise[T] = Promise[T]()

    override def run(): Unit = {
      // Is this safe? And does this properly survive a restart of the
actor?
      val caller = system.actorOf( Props(classOf[CallerActor[T]], promise) )
      // Right now, this is very prototypical and will very probably not
work in cluster (with remote nodes).
      //
      // Suggestion 1: As body of "() => T" is of type Function0, one could
use reflection to instantiate and invoke function on remote site.
      //               Does this work for anonymous functions? If so, how?
Function context does not seem available by this approach.
      // Suggestion 2: SIP-21 as to serialize the function and more
importantly the function context.
      caller ! Execute( () => body )
    }
  }

  /**
   * Apply method to create a regular future (via the promise used in
ActorCompletedPromiseRunnable).
   * @param body is going to be executed
   * @param executor set execution context for doing the work
   * @tparam T is the return type of the future
   * @return
   */
  def apply[T](body: => T)(implicit executor: ExecutionContext,
actorSystem: ActorSystem): scala.concurrent.Future[T] = {
    val runnable = new ActorCompletedPromiseRunnable(body, actorSystem)
    executor.prepare.execute(runnable)
    runnable.promise.future
  }
}


2014-02-28 12:44 GMT+01:00 √iktor Ҡlang <[email protected]>:

>
>
>
> On Fri, Feb 28, 2014 at 10:58 AM, Martin Senne <
> [email protected]> wrote:
>
>> Hi all,
>>
>> first, thanks for the very constructive answers.
>>
>> @Roland: As you pointed out, the future context is the most crucial part
>> when distributing futures. I followed your suggestion and had a deep look
>> into spores ( http://docs.scala-lang.org/sips/pending/spores.html ),
>> which seem to perfectly fit the requirements. Unfortunately, I was not able
>> to gain information about the current status of SIP-21. Is it somewhere
>> available?
>>
>> @Endre and Roland: Concerning remote class loading: I do not see the
>> point, of why one wants to do that. Is the assumption "code is already
>> distributed to all nodes" not valid? Asked the other way round: Under which
>> circumstances does one need to distribute code as well?
>>
>> @Viktor: Do you refer to "Idiomatic Akka (! / tell) versus Non-blocking
>> Akka Futures ( ? / ask)" (
>> https://groups.google.com/forum/?hl=de#!searchin/akka-user/e$20programming$20language/akka-user/GlMq6J4ZlAc/sDtayngFKMwJ)
>>  ? The discussion done there was very helpful in that it covers the
>> problems of different "execution context"s with the ask pattern when
>> completing the future. Thanks!
>>
>
> Great! You can also search the archives for "promise pipelining" for more
> discussion on the topic.
>
>
>>
>> For this first "PoC" of distributed futures I'll neglect
>>
>>    - code distribution
>>    - retrieval of available remote nodes
>>    - distribution strategy.
>>
>> I'll try and start with the ExecutionContext. Do you know of any
>> supplementary material for "creating a custom ExecutionContext"?
>>
>> Looking forward to hearing your comments,
>>
>> Martin
>>
>>
>> 2014-02-27 10:55 GMT+01:00 √iktor Ҡlang <[email protected]>:
>>
>> There was a discussion related the to E programming language on the
>>> mailing list not that long ago that might be of interest for this topic.
>>>
>>>
>>> On Thu, Feb 27, 2014 at 10:41 AM, Akka Team <[email protected]>wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> As Roland pointed out it is possible especially if you have the
>>>> corresponding classes available on all nodes. On the other hand I think the
>>>> preferred way is to use actors whenever it is possible instead of futures.
>>>> If you still need the result as a Future for some reason you can still fall
>>>> back to the ask pattern.
>>>>
>>>> -Endre
>>>>
>>>>
>>>> On Thu, Feb 27, 2014 at 8:10 AM, Roland Kuhn <[email protected]> wrote:
>>>>
>>>>> Hallo Martin,
>>>>>
>>>>> you can certainly create an ExecutionContext which is constructed from
>>>>> an ActorContext (to do the actor creation and remote deployment, and also
>>>>> to access the Cluster extension for finding out about possible destination
>>>>> nodes) and which does what you describe. If the work to be performed is
>>>>> worth the overhead of going over the network then this is a quite nifty
>>>>> idea, actually.
>>>>>
>>>>> The main problem you will encounter is that you need to ascertain that
>>>>> the function you send over the network is serializable (including all the
>>>>> context it captures—whether it needs that or not) and that the
>>>>> corresponding class files are available on the remote nodes as well. The
>>>>> former will probably get you interested in Spores (SIP-21) and the latter
>>>>> might lead you into the abyss that is called remote class loading (don’t 
>>>>> go
>>>>> there if you have a choice).
>>>>>
>>>>> Regards,
>>>>>
>>>>> Roland
>>>>>
>>>>> 26 feb 2014 kl. 16:02 skrev Martin Senne <[email protected]
>>>>> >:
>>>>>
>>>>> Hi *,
>>>>>
>>>>> recently and while browsing code and investigating how the ask pattern
>>>>> for actors is implemented, the question "Is it reasonable and possible to
>>>>> execute futures on actors on distributed nodes of an Akka cluster" came to
>>>>> my mind. Expressed differently: Can we automatically distribute work,
>>>>> futures do, across an Akka cluster.
>>>>>
>>>>> Is there already a solution, which I have overseen?
>>>>>
>>>>>
>>>>> *Suggestions: Steps to execute a future on an remote node*
>>>>>
>>>>>
>>>>>    1. create a promise locally
>>>>>    2. create one local actor (caller)
>>>>>    3. create one new remote actor on any of the cluster nodes
>>>>>    (callee), via round robin, etc.
>>>>>    4. send message to callee actor to start computation
>>>>>    5. ... actor comes eventually to execution ...
>>>>>    6. on completion of the computation, callee sends back
>>>>>    - ether a success message (containing the result of the future
>>>>>       computation) or
>>>>>       - a failure message (containg the Exception) to the caller
>>>>>       7. on receive of either success or failure message, the caller
>>>>>    puts the result into the promise
>>>>>
>>>>> Does this approach seem valid?
>>>>>
>>>>> Within step 3 the "automatism" comes into play. Questions to be
>>>>> answered: Where is the remote actor created? Which strategy does one
>>>>> choose?
>>>>> Nevertheless, the user should not be bothered with these details.
>>>>> Instead, the user should just configure which remote nodes form the 
>>>>> cluster
>>>>> and are available to execute work.
>>>>>
>>>>>
>>>>> *Where to start ?*
>>>>>
>>>>> On the one hand, a future comes to execution (wrapped as Runnable) via
>>>>> the execute method of an ExecutionContext. If one sets a different
>>>>> (implicit) ExecutionContext, this behavior can be changed.
>>>>> On the other hand the implementation in scala.concurrent.impl.Future
>>>>> and scala.concurrent.impl.Future.PromiseCompletingRunnable could allow a
>>>>> different approach.
>>>>>
>>>>> What is a reasonable starting point?
>>>>>
>>>>>
>>>>> Looking forward to hearing your comments and ideas,
>>>>>
>>>>> Martin
>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>> >>>>>>>>>> Search the archives:
>>>>> https://groups.google.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *Dr. Roland Kuhn*
>>>>> *Akka Tech Lead*
>>>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>>>> twitter: @rolandkuhn
>>>>>  <http://twitter.com/#!/rolandkuhn>
>>>>>
>>>>>  --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>> >>>>>>>>>> Search the archives:
>>>>> https://groups.google.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Akka Team
>>>> Typesafe - The software stack for applications that scale
>>>> Blog: letitcrash.com
>>>> Twitter: @akkateam
>>>>
>>>> --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>> >>>>>>>>>> Search the archives:
>>>> https://groups.google.com/group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>> √
>>>
>>> * ——————— **Viktor Klang*
>>> *Chief Architect - **Typesafe <http://www.typesafe.com/>*
>>>
>>>  Twitter: @viktorklang
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>
>>
>>  --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/groups/opt_out.
>>
>
>
>
> --
> Cheers,
> √
>
> * ——————— **Viktor Klang*
> *Chief Architect - **Typesafe <http://www.typesafe.com/>*
>
>  Twitter: @viktorklang
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/groups/opt_out.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: http://akka.io/faq/
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to