can you just replace "Duration.Inf" with a shorter duration ? how about
import scala.concurrent.duration._
val timeout = new Timeout(10 seconds)
Await.result(result.future, timeout.duration)
or
val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
Await.result(result.future, timeout)
or simply
import scala.concurrent.duration._
Await.result(result.future, 10 seconds)
On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <[email protected]> wrote:
> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
> def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
> // A monitor for the thread to wait on.
> val result = Promise[ManagedBuffer]()
> fetchBlocks(host, port, execId, Array(blockId),
> new BlockFetchingListener {
> override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
> result.failure(exception)
> }
> override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
> val ret = ByteBuffer.allocate(data.size.toInt)
> ret.put(data.nioByteBuffer())
> ret.flip()
> result.success(new NioManagedBuffer(ret))
> }
> })
>
> Await.result(result.future, Duration.Inf)
> }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?
>