Hi, Sean I was in the same problem
but when I changed MASTER=“local” to MASTER=“local[2]” everything back to the normal Hasn’t get a chance to ask here Best, -- Nan Zhu On Friday, May 30, 2014 at 9:09 AM, Sean Owen wrote: > Guys I'm struggling to debug some strange behavior in a simple > Streaming + Java + Kafka example -- in fact, a simplified version of > JavaKafkaWordcount, that is just calling print() on a sequence of > messages. > > Data is flowing, but it only appears to work for a few periods -- > sometimes 0 -- before ceasing to call any actions. Sorry for lots of > log posting but it may illustrate to someone who knows this better > what is happening: > > > > Key action in the logs seems to be as follows -- it works a few times: > > ... > 2014-05-30 13:53:50 INFO ReceiverTracker:58 - Stream 0 received 0 blocks > 2014-05-30 13:53:50 INFO JobScheduler:58 - Added jobs for time 1401454430000 > ms > ------------------------------------------- > Time: 1401454430000 ms > ------------------------------------------- > > 2014-05-30 13:53:50 INFO JobScheduler:58 - Starting job streaming job > 1401454430000 ms.0 from job set of time 1401454430000 ms > 2014-05-30 13:53:50 INFO JobScheduler:58 - Finished job streaming job > 1401454430000 ms.0 from job set of time 1401454430000 ms > 2014-05-30 13:53:50 INFO JobScheduler:58 - Total delay: 0.004 s for > time 1401454430000 ms (execution: 0.000 s) > 2014-05-30 13:53:50 INFO MappedRDD:58 - Removing RDD 2 from persistence list > 2014-05-30 13:53:50 INFO BlockManager:58 - Removing RDD 2 > 2014-05-30 13:53:50 INFO BlockRDD:58 - Removing RDD 1 from persistence list > 2014-05-30 13:53:50 INFO BlockManager:58 - Removing RDD 1 > 2014-05-30 13:53:50 INFO KafkaInputDStream:58 - Removing blocks of > RDD BlockRDD[1] at BlockRDD at ReceiverInputDStream.scala:69 of time > 1401454430000 ms > 2014-05-30 13:54:00 INFO ReceiverTracker:58 - Stream 0 received 0 blocks > 2014-05-30 13:54:00 INFO JobScheduler:58 - Added jobs for time 1401454440000 > ms > ... > > > Then works with some additional, different output in the logs -- here > you see output is flowing too: > > ... > 2014-05-30 13:54:20 INFO ReceiverTracker:58 - Stream 0 received 2 blocks > 2014-05-30 13:54:20 INFO JobScheduler:58 - Added jobs for time 1401454460000 > ms > 2014-05-30 13:54:20 INFO JobScheduler:58 - Starting job streaming job > 1401454460000 ms.0 from job set of time 1401454460000 ms > 2014-05-30 13:54:20 INFO SparkContext:58 - Starting job: take at > DStream.scala:593 > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Got job 1 (take at > DStream.scala:593) with 1 output partitions (allowLocal=true) > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Final stage: Stage 1(take > at DStream.scala:593) > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Parents of final stage: List() > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Missing parents: List() > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Computing the requested > partition locally > 2014-05-30 13:54:20 INFO BlockManager:58 - Found block > input-0-1401454458400 locally > 2014-05-30 13:54:20 INFO SparkContext:58 - Job finished: take at > DStream.scala:593, took 0.007007 s > 2014-05-30 13:54:20 INFO SparkContext:58 - Starting job: take at > DStream.scala:593 > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Got job 2 (take at > DStream.scala:593) with 1 output partitions (allowLocal=true) > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Final stage: Stage 2(take > at DStream.scala:593) > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Parents of final stage: List() > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Missing parents: List() > 2014-05-30 13:54:20 INFO DAGScheduler:58 - Computing the requested > partition locally > 2014-05-30 13:54:20 INFO BlockManager:58 - Found block > input-0-1401454459400 locally > 2014-05-30 13:54:20 INFO SparkContext:58 - Job finished: take at > DStream.scala:593, took 0.002217 s > ------------------------------------------- > Time: 1401454460000 ms > ------------------------------------------- > 99,true,-0.11342268416043325 > 17,false,1.6732879882133793 > ... > > > Then keeps repeating the following with no more evidence that the > print() action is being called: > > ... > 2014-05-30 13:54:20 INFO JobScheduler:58 - Finished job streaming job > 1401454460000 ms.0 from job set of time 1401454460000 ms > 2014-05-30 13:54:20 INFO MappedRDD:58 - Removing RDD 8 from persistence list > 2014-05-30 13:54:20 INFO JobScheduler:58 - Total delay: 0.019 s for > time 1401454460000 ms (execution: 0.015 s) > 2014-05-30 13:54:20 INFO BlockManager:58 - Removing RDD 8 > 2014-05-30 13:54:20 INFO BlockRDD:58 - Removing RDD 7 from persistence list > 2014-05-30 13:54:20 INFO BlockManager:58 - Removing RDD 7 > 2014-05-30 13:54:20 INFO KafkaInputDStream:58 - Removing blocks of > RDD BlockRDD[7] at BlockRDD at ReceiverInputDStream.scala:69 of time > 1401454460000 ms > 2014-05-30 13:54:20 INFO MemoryStore:58 - ensureFreeSpace(100) called > with curMem=201, maxMem=2290719129 > 2014-05-30 13:54:20 INFO MemoryStore:58 - Block input-0-1401454460400 > stored as bytes to memory (size 100.0 B, free 2.1 GB) > 2014-05-30 13:54:20 INFO BlockManagerInfo:58 - Added > input-0-1401454460400 in memory on 192.168.1.10:60886 (size: 100.0 B, > free: 2.1 GB) > 2014-05-30 13:54:20 INFO BlockManagerMaster:58 - Updated info of > block input-0-1401454460400 > 2014-05-30 13:54:20 WARN BlockManager:70 - Block > input-0-1401454460400 already exists on this machine; not re-adding it > 2014-05-30 13:54:20 INFO BlockGenerator:58 - Pushed block > input-0-1401454460400 > 2014-05-30 13:54:21 INFO MemoryStore:58 - ensureFreeSpace(100) called > with curMem=301, maxMem=2290719129 > 2014-05-30 13:54:21 INFO MemoryStore:58 - Block input-0-1401454461400 > stored as bytes to memory (size 100.0 B, free 2.1 GB) > 2014-05-30 13:54:21 INFO BlockManagerInfo:58 - Added > input-0-1401454461400 in memory on 192.168.1.10:60886 (size: 100.0 B, > free: 2.1 GB) > 2014-05-30 13:54:21 INFO BlockManagerMaster:58 - Updated info of > block input-0-1401454461400 > 2014-05-30 13:54:21 WARN BlockManager:70 - Block > input-0-1401454461400 already exists on this machine; not re-adding it > 2014-05-30 13:54:21 INFO BlockGenerator:58 - Pushed block > input-0-1401454461400 > 2014-05-30 13:54:22 INFO MemoryStore:58 - ensureFreeSpace(99) called > with curMem=401, maxMem=2290719129 > 2014-05-30 13:54:22 INFO MemoryStore:58 - Block input-0-1401454462400 > stored as bytes to memory (size 99.0 B, free 2.1 GB) > 2014-05-30 13:54:22 INFO BlockManagerInfo:58 - Added > input-0-1401454462400 in memory on 192.168.1.10:60886 (size: 99.0 B, > free: 2.1 GB) > ... > > > Occasionally it says: > > ... > 2014-05-30 13:54:30 INFO ReceiverTracker:58 - Stream 0 received 10 blocks > 2014-05-30 13:54:30 INFO JobScheduler:58 - Added jobs for time 1401454470000 > ms > 2014-05-30 13:54:30 INFO JobScheduler:58 - Starting job streaming job > 1401454470000 ms.0 from job set of time 1401454470000 ms > 2014-05-30 13:54:30 INFO SparkContext:58 - Starting job: take at > DStream.scala:593 > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Got job 3 (take at > DStream.scala:593) with 1 output partitions (allowLocal=true) > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Final stage: Stage 3(take > at DStream.scala:593) > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Parents of final stage: List() > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Missing parents: List() > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Computing the requested > partition locally > 2014-05-30 13:54:30 INFO BlockManager:58 - Found block > input-0-1401454460400 locally > 2014-05-30 13:54:30 INFO SparkContext:58 - Job finished: take at > DStream.scala:593, took 0.003993 s > 2014-05-30 13:54:30 INFO SparkContext:58 - Starting job: take at > DStream.scala:593 > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Got job 4 (take at > DStream.scala:593) with 9 output partitions (allowLocal=true) > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Final stage: Stage 4(take > at DStream.scala:593) > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Parents of final stage: List() > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Missing parents: List() > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Submitting Stage 4 > (MappedRDD[12] at map at MappedDStream.scala:35), which has no missing > parents > 2014-05-30 13:54:30 INFO DAGScheduler:58 - Submitting 9 missing tasks > from Stage 4 (MappedRDD[12] at map at MappedDStream.scala:35) > 2014-05-30 13:54:30 INFO TaskSchedulerImpl:58 - Adding task set 4.0 > with 9 tasks > ... > > > Output is definitely continuing to be written to Kafka; you can even > see that it seems to be acknolwedging that the stream is seeing more > data. > > The same happens with operations like saving to file. It looks like > the action is no longer scheduled. > > Does that ring any bells? I'm at a loss!