Looks like somehow the file size reported by the FSInputDStream of
Tachyon's FileSystem interface, is returning zero.

On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Just to follow up this thread further .
>
> I was doing some fault tolerant testing of Spark Streaming with Tachyon as
> OFF_HEAP block store. As I said in earlier email, I could able to solve the
> BlockNotFound exception when I used Hierarchical Storage of Tachyon ,
>  which is good.
>
> I continue doing some testing around storing the Spark Streaming WAL and
> CheckPoint files also in Tachyon . Here is few finding ..
>
>
> When I store the Spark Streaming Checkpoint location in Tachyon , the
> throughput is much higher . I tested the Driver and Receiver failure cases
> , and Spark Streaming is able to recover without any Data Loss on Driver
> failure.
>
> *But on Receiver failure , Spark Streaming looses data* as I see
> Exception while reading the WAL file from Tachyon "receivedData" location
>  for the same Receiver id which just failed.
>
> If I change the Checkpoint location back to HDFS , Spark Streaming can
> recover from both Driver and Receiver failure .
>
> Here is the Log details when Spark Streaming receiver failed ...I raised a
> JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525
>
>
>
> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch
> 1)*
> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
> remove executor 2 from BlockManagerMaster.
> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
> block manager BlockManagerId(2, 10.252.5.54, 45789)
> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
> successfully in removeExecutor
> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
> receiver for stream 2 from 10.252.5.62*:47255
> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
> 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not
> read data from write ahead log record
> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.IllegalArgumentException:* Seek position is past
> EOF: 645603894, fileSize = 0*
> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
> ... 15 more
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
> stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage
> 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException
> (Could not read data from write ahead log record
> FileBasedWriteAheadLogSegment(tachyon-ft://
> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
> [duplicate 1]
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
> stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor
> updated: app-20150511104442-0048/2 is now LOST (worker lost)
> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
> Executor app-20150511104442-0048/2 removed: worker lost
> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
> Asked to remove non-existent executor 2
> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in stage
> 103.0 (TID 423) on executor 10.252.5.62: org.apache.spark.SparkException
> (Could not read data from write ahead log record
> FileBasedWriteAheadLogSegment(tachyon-ft://
> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
> [duplicate 2]
> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage 103.0
> failed 4 times; aborting job
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
> 103.0, whose tasks have all completed, from pool
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage 103
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
> (foreachRDD at Consumer.java:92) failed in 0.943 s
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
> foreachRDD at Consumer.java:92, took 0.953482 s
> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error running
> job streaming job 1431341145000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
> in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in stage
> 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could not
> read data from write ahead log record
> FileBasedWriteAheadLogSegment(tachyon-ft://
> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
> )
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.IllegalArgumentException: Seek position is past EOF:
> 645603894, fileSize = 0
> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
> ... 15 more
>
>
>
>
>
>
> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com> wrote:
>
>> Thanks for the updates!
>>
>> Best,
>>
>> Haoyuan
>>
>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Just a followup on this Thread .
>>>
>>> I tried Hierarchical Storage on Tachyon (
>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and
>>> that
>>> seems to have worked and I did not see any any Spark Job failed due to
>>> BlockNotFoundException.
>>> below is my  Hierarchical Storage settings..
>>>
>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>
>>>
>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>   -Dtachyon.worker.evict.strategy=LRU
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>> > Dear All ,
>>> >
>>> > I have been playing with Spark Streaming on Tachyon as the OFF_HEAP
>>> block
>>> > store  . Primary reason for evaluating Tachyon is to find if Tachyon
>>> can
>>> > solve the Spark BlockNotFoundException .
>>> >
>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted , jobs
>>> > failed due to block not found exception and storing blocks in
>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>> throughput a
>>> > lot .
>>> >
>>> >
>>> > To test how Tachyon behave , I took the latest spark 1.4 from master ,
>>> and
>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode .
>>> Tachyon
>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3
>>> node AWS
>>> > x-large cluster.
>>> >
>>> > I have used the low level Receiver based Kafka consumer (
>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>> written
>>> > to pull from Kafka and write Blocks to Tachyon
>>> >
>>> >
>>> > I found there is similar improvement in throughput (as MEMORY_ONLY
>>> case )
>>> > but very good overall memory utilization (as it is off heap store) .
>>> >
>>> >
>>> > But I found one issue on which I need to clarification .
>>> >
>>> >
>>> > In Tachyon case also , I find  BlockNotFoundException  , but due to a
>>> > different reason .  What I see TachyonBlockManager.scala put the
>>> blocks in
>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate
>>> evicted
>>> > from Tachyon Cache and when Spark try to find the block it throws
>>> >  BlockNotFoundException .
>>> >
>>> > I see a pull request which discuss the same ..
>>> >
>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>> >
>>> >
>>> > When I modified the WriteType to CACHE_THROUGH , BlockDropException is
>>> > gone , but it again impact the throughput ..
>>> >
>>> >
>>> > Just curious to know , if Tachyon has any settings which can solve the
>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>> > CACHE_THROUGH  ?
>>> >
>>> > Regards,
>>> > Dibyendu
>>> >
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> Haoyuan Li
>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>
>
>

Reply via email to