There are a couple of options. Increase timeout (see Spark configuration).

Also see past mails in the mailing list.

Another option you may try (I have gut feeling that may work, but I am not
sure) is calling GC on the driver periodically. The cleaning up of stuff is
tied to GCing of RDD objects and regular cleaning may help keep things
clean more rigorously rather than in unpredictable bursts of GC activity.

Let us know how it works out.

TD

On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal <nb.nos...@gmail.com> wrote:

> I have a standalone and local Spark streaming process where we are reading
> inputs using FlumeUtils. Our longest window size is 6 hours. After about a
> day and a half of running without any issues, we start seeing Timeout
> errors while cleaning up input blocks. This seems to cause reading from
> Flume to cease.
>
>
> ERROR sparkDriver-akka.actor.default-dispatcher-78
> BlockManagerSlaveActor.logError - Error in removing block
> input-0-1428182594000
> org.apache.spark.SparkException: Error sending message [message =
> UpdateBlockInfo(BlockManagerId(<driver>, localhost,
> 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
> 1),0,0,0)]
> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
> at
> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
> at
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
> at
> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
> at
> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
> at
> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
> at
> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
> at
> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
> at
> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [30 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
> at
> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
> at
> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
> at scala.concurrent.Await$.result(package.scala:107)
> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
> ... 17 more
>
> There was a similar query posted here
> http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html
> but did not find any resolution to that issue.
>
>
> Thanks in advance,
> NB
>
>

Reply via email to