ask() is a method on every Actor.  It comes from the akka library, which
spark uses for a lot of the communication between various components.

There is some documentation on ask() here (go to the section on "Send
messages"):
http://doc.akka.io/docs/akka/2.2.3/scala/actors.html

though if you are totally new to it, you might want to work through a
simple akka tutorial first, before diving into the docs.

On Fri, Nov 7, 2014 at 4:11 AM, rapelly kartheek <kartheek.m...@gmail.com>
wrote:

>  Hi,
>
> I am trying to understand how the
> /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.
>
> def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
> Seq[BlockManagerId] = {
>
> val result =
> askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
>
> if (result.length != numPeers) {
>
> throw new SparkException(
>
> "Error getting peers, only got " + result.size + " instead of " + numPeers)
>
> }
>
> result
>  }
>
> Here, getPeers calls askDriverWithReply().
>
>  private def askDriverWithReply[T](message: Any): T = {
>
> // TODO: Consider removing multiple attempts
>
> if (driverActor == null) {
>
> throw new SparkException("Error sending message to BlockManager as
> driverActor is null " +
>
> "[message = " + message + "]")
>
> }
>
> var attempts = 0
>
> var lastException: Exception = null
>
> while (attempts < AKKA_RETRY_ATTEMPTS) {
>
> attempts += 1
>
> try {
>
> val future = driverActor.ask(message)(timeout)
>
> val result = Await.result(future, timeout)
>
> if (result == null) {
>
> throw new SparkException("BlockManagerMaster returned null")
>
> }
>
>  return result.asInstanceOf[T]
>
> } catch {
>
> case ie: InterruptedException => throw ie
>
> case e: Exception =>
>
> lastException = e
>
> logWarning("Error sending message to BlockManagerMaster in " + attempts + "
> attempts", e)
>
> }
>
> Thread.sleep(AKKA_RETRY_INTERVAL_MS)
>
> }
>
> throw new SparkException("Error sending message to BlockManagerMaster
> [message = " + message + "]", lastException)
>  }
>
> Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
> The Driver returns the BlockManagerId's.
>
> val future = driverActor.ask(message)(timeout)
>
> val result = Await.result(future, timeout)
> Here, we obtain "result". But, I couldn't find definition of ask() that
> processes message GetPeers(). Can someone please tell me how/where the
> 'result' is being constructed??
>
> Thank you!!
> Karthik
>

Reply via email to