Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero.
On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > Just to follow up this thread further . > > I was doing some fault tolerant testing of Spark Streaming with Tachyon as > OFF_HEAP block store. As I said in earlier email, I could able to solve the > BlockNotFound exception when I used Hierarchical Storage of Tachyon , > which is good. > > I continue doing some testing around storing the Spark Streaming WAL and > CheckPoint files also in Tachyon . Here is few finding .. > > > When I store the Spark Streaming Checkpoint location in Tachyon , the > throughput is much higher . I tested the Driver and Receiver failure cases > , and Spark Streaming is able to recover without any Data Loss on Driver > failure. > > *But on Receiver failure , Spark Streaming looses data* as I see > Exception while reading the WAL file from Tachyon "receivedData" location > for the same Receiver id which just failed. > > If I change the Checkpoint location back to HDFS , Spark Streaming can > recover from both Driver and Receiver failure . > > Here is the Log details when Spark Streaming receiver failed ...I raised a > JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 > > > > INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch > 1)* > INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to > remove executor 2 from BlockManagerMaster. > INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing > block manager BlockManagerId(2, 10.252.5.54, 45789) > INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 > successfully in removeExecutor > INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered > receiver for stream 2 from 10.252.5.62*:47255 > WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage > 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not > read data from write ahead log record > FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 > <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)* > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org > $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) > at > org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) > at > org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.lang.IllegalArgumentException:* Seek position is past > EOF: 645603894, fileSize = 0* > at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) > at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org > $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) > ... 15 more > > INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in > stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) > INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage > 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException > (Could not read data from write ahead log record > FileBasedWriteAheadLogSegment(tachyon-ft:// > 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) > [duplicate 1] > INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in > stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) > INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor > updated: app-20150511104442-0048/2 is now LOST (worker lost) > INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - > Executor app-20150511104442-0048/2 removed: worker lost > ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - > Asked to remove non-existent executor 2 > INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in stage > 103.0 (TID 423) on executor 10.252.5.62: org.apache.spark.SparkException > (Could not read data from write ahead log record > FileBasedWriteAheadLogSegment(tachyon-ft:// > 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) > [duplicate 2] > ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage 103.0 > failed 4 times; aborting job > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet > 103.0, whose tasks have all completed, from pool > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage 103 > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103 > (foreachRDD at Consumer.java:92) failed in 0.943 s > INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed: > foreachRDD at Consumer.java:92, took 0.953482 s > ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error running > job streaming job 1431341145000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 > in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in stage > 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could not > read data from write ahead log record > FileBasedWriteAheadLogSegment(tachyon-ft:// > 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 > ) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org > $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) > at > org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) > at > org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: > 645603894, fileSize = 0 > at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) > at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org > $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) > ... 15 more > > > > > > > On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com> wrote: > >> Thanks for the updates! >> >> Best, >> >> Haoyuan >> >> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Just a followup on this Thread . >>> >>> I tried Hierarchical Storage on Tachyon ( >>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and >>> that >>> seems to have worked and I did not see any any Spark Job failed due to >>> BlockNotFoundException. >>> below is my Hierarchical Storage settings.. >>> >>> -Dtachyon.worker.hierarchystore.level.max=2 >>> -Dtachyon.worker.hierarchystore.level0.alias=MEM >>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER >>> >>> >>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE >>> -Dtachyon.worker.hierarchystore.level1.alias=HDD >>> -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon >>> -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB >>> -Dtachyon.worker.allocate.strategy=MAX_FREE >>> -Dtachyon.worker.evict.strategy=LRU >>> >>> Regards, >>> Dibyendu >>> >>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>> > Dear All , >>> > >>> > I have been playing with Spark Streaming on Tachyon as the OFF_HEAP >>> block >>> > store . Primary reason for evaluating Tachyon is to find if Tachyon >>> can >>> > solve the Spark BlockNotFoundException . >>> > >>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted , jobs >>> > failed due to block not found exception and storing blocks in >>> > MEMORY_AND_DISK is not a good option either as it impact the >>> throughput a >>> > lot . >>> > >>> > >>> > To test how Tachyon behave , I took the latest spark 1.4 from master , >>> and >>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode . >>> Tachyon >>> > is running in 3 Node AWS x-large cluster and Spark is running in 3 >>> node AWS >>> > x-large cluster. >>> > >>> > I have used the low level Receiver based Kafka consumer ( >>> > https://github.com/dibbhatt/kafka-spark-consumer) which I have >>> written >>> > to pull from Kafka and write Blocks to Tachyon >>> > >>> > >>> > I found there is similar improvement in throughput (as MEMORY_ONLY >>> case ) >>> > but very good overall memory utilization (as it is off heap store) . >>> > >>> > >>> > But I found one issue on which I need to clarification . >>> > >>> > >>> > In Tachyon case also , I find BlockNotFoundException , but due to a >>> > different reason . What I see TachyonBlockManager.scala put the >>> blocks in >>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate >>> evicted >>> > from Tachyon Cache and when Spark try to find the block it throws >>> > BlockNotFoundException . >>> > >>> > I see a pull request which discuss the same .. >>> > >>> > https://github.com/apache/spark/pull/158#discussion_r11195271 >>> > >>> > >>> > When I modified the WriteType to CACHE_THROUGH , BlockDropException is >>> > gone , but it again impact the throughput .. >>> > >>> > >>> > Just curious to know , if Tachyon has any settings which can solve the >>> > Block Eviction from Cache to Disk, other than explicitly setting >>> > CACHE_THROUGH ? >>> > >>> > Regards, >>> > Dibyendu >>> > >>> > >>> > >>> >> >> >> >> -- >> Haoyuan Li >> CEO, Tachyon Nexus <http://www.tachyonnexus.com/> >> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/ >> > >