Sometime back I was playing with Spark and Tachyon and I also found this
issue .  The issue here is TachyonBlockManager put the blocks in
WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
from Tachyon Cache when Memory is full and when Spark try to find the block
it throws  BlockNotFoundException .

To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
worked and I did not see any any Spark Job failed due to
BlockNotFoundException.
below is my  Hierarchical Storage settings which I used..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

-Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

Regards,
Dibyendu

On Wed, Aug 26, 2015 at 12:25 PM, Todd <bit1...@163.com> wrote:

>
> I am using tachyon in the spark program below,but I encounter a
> BlockNotFoundxception.
> Does someone know what's wrong and also is there guide on how to configure
> spark to work with Tackyon?Thanks!
>
>     conf.set("spark.externalBlockStore.url", "tachyon://10.18.19.33:19998
> ")
>     conf.set("spark.externalBlockStore.baseDir","/spark")
>     val sc = new SparkContext(conf)
>     import org.apache.spark.storage.StorageLevel
>     val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
>     rdd.persist(StorageLevel.OFF_HEAP)
>     val count = rdd.count()
>    val sum = rdd.reduce(_ + _)
>     println(s"The count: $count, The sum is: $sum")
>
>
> 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
> in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
> 0.0 (TID 5, localhost): java.lang.RuntimeException:
> org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
>     at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
>     at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>     at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>     at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>     at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>     at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>     at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>     at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>     at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>     at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>     at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>     at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>     at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>     at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>     at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>     at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>
>
>
>

Reply via email to