Re: custom receiver in java
Yes, thanks updating this old thread! We heard our community demands and added support for Java receivers! TD On Wed, Jun 4, 2014 at 12:15 PM, lbustelo g...@bustelos.com wrote: Not that what TD was referring above, is already in 1.0.0 http://spark.apache.org/docs/1.0.0/streaming-custom-receivers.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/custom-receiver-in-java-tp3575p6962.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Failed to remove RDD error
It was not intended to be experimental as this improves general performance. We tested the feature since 0.9, and didnt see any problems. We need to investigate the cause of this. Can you give us the logs showing this error so that we can analyze this. TD On Tue, Jun 3, 2014 at 10:08 AM, Michael Chang m...@tellapart.com wrote: Thanks Tathagata, Thanks for all your hard work! In the future, is it possible to mark experimental features as such on the online documentation? Thanks, Michael On Mon, Jun 2, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Spark.streaming.unpersist was an experimental feature introduced with Spark 0.9 (but kept disabled), which actively clears off RDDs that are not useful any more. in Spark 1.0 that has been enabled by default. It is possible that this is an unintended side-effect of that. If spark.cleaner.ttl works then that should be used. TD On Mon, Jun 2, 2014 at 9:42 AM, Michael Chang m...@tellapart.com wrote: Hey Mayur, Thanks for the suggestion, I didn't realize that was configurable. I don't think I'm running out of memory, though it does seem like these errors go away when i turn off the spark.streaming.unpersist configuration and use spark.cleaner.ttl instead. Do you know if there are known issues with the unpersist option? On Sat, May 31, 2014 at 12:17 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: You can increase your akka timeout, should give you some more life.. are you running out of memory by any chance? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 6:52 AM, Michael Chang m...@tellapart.com wrote: I'm running a some kafka streaming spark contexts (on 0.9.1), and they seem to be dying after 10 or so minutes with a lot of these errors. I can't really tell what's going on here, except that maybe the driver is unresponsive somehow? Has anyone seen this before? 14/05/31 01:13:30 ERROR BlockManagerMaster: Failed to remove RDD 12635 akka.pattern.AskTimeoutException: Timed out at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:691) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:688) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455) at akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407) at akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411) at akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363) at java.lang.Thread.run(Thread.java:744) Thanks, Mike
Re: NoSuchElementException: key not found
I think I know what is going on! This probably a race condition in the DAGScheduler. I have added a JIRA for this. The fix is not trivial though. https://issues.apache.org/jira/browse/SPARK-2002 A not-so-good workaround for now would be not use coalesced RDD, which is avoids the race condition. TD On Tue, Jun 3, 2014 at 10:09 AM, Michael Chang m...@tellapart.com wrote: I only had the warning level logs, unfortunately. There were no other references of 32855 (except a repeated stack trace, I believe). I'm using Spark 0.9.1 On Mon, Jun 2, 2014 at 5:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you have the info level logs of the application? Can you grep the value 32855 to find any references to it? Also what version of the Spark are you using (so that I can match the stack trace, does not seem to match with Spark 1.0)? TD On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang m...@tellapart.com wrote: Hi all, Seeing a random exception kill my spark streaming job. Here's a stack trace: java.util.NoSuchElementException: key not found: 32855 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator.init(CoalescedRDD.scala:183) at org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234) at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333) at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:830) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27) at com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87) at com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520
Re: NoSuchElementException: key not found
I am not sure what DStream operations you are using, but some operation is internally creating CoalescedRDDs. That is causing the race condition. I might be able help if you can tell me what DStream operations you are using. TD On Tue, Jun 3, 2014 at 4:54 PM, Michael Chang m...@tellapart.com wrote: Hi Tathagata, Thanks for your help! By not using coalesced RDD, do you mean not repartitioning my Dstream? Thanks, Mike On Tue, Jun 3, 2014 at 12:03 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I think I know what is going on! This probably a race condition in the DAGScheduler. I have added a JIRA for this. The fix is not trivial though. https://issues.apache.org/jira/browse/SPARK-2002 A not-so-good workaround for now would be not use coalesced RDD, which is avoids the race condition. TD On Tue, Jun 3, 2014 at 10:09 AM, Michael Chang m...@tellapart.com wrote: I only had the warning level logs, unfortunately. There were no other references of 32855 (except a repeated stack trace, I believe). I'm using Spark 0.9.1 On Mon, Jun 2, 2014 at 5:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you have the info level logs of the application? Can you grep the value 32855 to find any references to it? Also what version of the Spark are you using (so that I can match the stack trace, does not seem to match with Spark 1.0)? TD On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang m...@tellapart.com wrote: Hi all, Seeing a random exception kill my spark streaming job. Here's a stack trace: java.util.NoSuchElementException: key not found: 32855 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator.init(CoalescedRDD.scala:183) at org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234) at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333) at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:830) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27
Re: Interactive modification of DStreams
Currently Spark Streaming does not support addition/deletion/modification of DStream after the streaming context has been started. Nor can you restart a stopped streaming context. Also, multiple spark contexts (and therefore multiple streaming contexts) cannot be run concurrently in the same JVM. To change the window duration, I would one of the following. 1. Stop the previous streaming context, create a new streaming context, and setup the dstreams once again with the new window duration 2. Create a custom DStream, say DynamicWindowDStream. Take a look at how WindowedDStream https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala is implemented (pretty simple, just a union over RDDs across time). That should allow you to modify the window duration. However, do make sure you have a maximum window duration that you will ever reach, and make sure you define parentRememberDuration https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala#L53 as a rememberDuration + maxWindowDuration. That fields defines which RDDs can be forgotten, so is sensitive to the window duration. Then you have to take care of correctly (atomically, etc.) modifying the window duration as per your requirements. Happy streaming! TD On Mon, Jun 2, 2014 at 2:46 PM, lbustelo g...@bustelos.com wrote: This is a general question about whether Spark Streaming can be interactive like batch Spark jobs. I've read plenty of threads and done my fair bit of experimentation and I'm thinking the answer is NO, but it does not hurt to ask. More specifically, I would like to be able to do: 1. Add/Remove steps to the Streaming Job 2. Modify Window durations 3. Stop and Restart context. I've tried the following: 1. Modify the DStream after it has been started… BOOM! Exceptions everywhere. 2. Stop the DStream, Make modification, Start… NOT GOOD :( In 0.9.0 I was getting deadlocks. I also tried 1.0.0 and it did not work. 3. Based on information provided here http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3371.html , I was been able to prototype modifying the RDD computation within a forEachRDD. That is nice, but you are then bounded to the specified batch size. That got me to wanting to modify Window durations. Is changing the Window duration possible? 4. Tried running multiple streaming context from within a single Driver application and got several exceptions. The first one was bind exception on the web port. Then once the app started getting run (cores were taken but 1st job) it did not run correctly. A lot of akka.pattern.AskTimeoutException: Timed out . I've tried my experiments in 0.9.0, 0.9.1 and 1.0.0 running on Standalone Cluster setup. Thanks in advanced -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interactive-modification-of-DStreams-tp6740.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Window slide duration
I am assuming that you are referring to the OneForOneStrategy: key not found: 1401753992000 ms error, and not to the previous Time 1401753992000 ms is invalid Those two seem a little unrelated to me. Can you give us the stacktrace associated with the key-not-found error? TD On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan kot.bege...@gmail.com wrote: Hi all, I am getting an error: 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference is 6000 ms 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms My relevant code is: === ssc = new StreamingContext(conf, Seconds(1)) val messageEvents = events. flatMap(e = evaluatorCached.value.find(e)). window(Seconds(8), Seconds(4)) messageEvents.print() === Seems all right to me, window slide duration (4) is streaming context batch duration (1) *2. So, what's the problem? Spark-v1.0.0 -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified
Re: NoSuchElementException: key not found
Do you have the info level logs of the application? Can you grep the value 32855 to find any references to it? Also what version of the Spark are you using (so that I can match the stack trace, does not seem to match with Spark 1.0)? TD On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang m...@tellapart.com wrote: Hi all, Seeing a random exception kill my spark streaming job. Here's a stack trace: java.util.NoSuchElementException: key not found: 32855 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator.init(CoalescedRDD.scala:183) at org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234) at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333) at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:830) at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337) at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27) at com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87) at com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
Re: Window slide duration
Can you give all the logs? Would like to see what is clearing the key 1401754908000 ms TD On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan kot.bege...@gmail.com wrote: Ok, it seems like Time ... is invalid is part of normal workflow, when window DStream will ignore RDDs at moments in time when they do not match to the window sliding interval. But why am I getting exception is still unclear. Here is the full stack: 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference is 1000 ms 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 ms java.util.NoSuchElementException: key not found: 1401754908000 ms at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan kot.bege...@gmail.com wrote: Hi all, I am getting an error: 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference is 6000 ms 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms My relevant code is: === ssc = new StreamingContext(conf, Seconds(1)) val messageEvents = events. flatMap(e = evaluatorCached.value.find(e)). window(Seconds(8), Seconds(4)) messageEvents.print() === Seems all right to me, window slide duration (4) is streaming context batch duration (1) *2. So, what's the problem? Spark-v1.0.0 -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified
Re: Unable to run a Standalone job
How are you launching the application? sbt run ? spark-submit? local mode or Spark standalone cluster? Are you packaging all your code into a jar? Looks to me that you seem to have spark classes in your execution environment but missing some of Spark's dependencies. TD On Thu, May 22, 2014 at 2:27 PM, Shrikar archak shrika...@gmail.com wrote: Hi All, I am trying to run the network count example as a seperate standalone job and running into some issues. Environment: 1) Mac Mavericks 2) Latest spark repo from Github. I have a structure like this Shrikars-MacBook-Pro:SimpleJob shrikar$ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/NetworkWordCount.scala ./src/main/scala/SimpleApp.scala.bk simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0-SNAPSHOT, org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT) resolvers += Akka Repository at http://repo.akka.io/releases/; I am able to run the SimpleApp which is mentioned in the doc but when I try to run the NetworkWordCount app I get error like this am I missing something? [info] Running com.shrikar.sparkapps.NetworkWordCount 14/05/22 14:26:47 INFO spark.SecurityManager: Changing view acls to: shrikar 14/05/22 14:26:47 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shrikar) 14/05/22 14:26:48 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/05/22 14:26:48 INFO Remoting: Starting remoting 14/05/22 14:26:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.10.88:49963] 14/05/22 14:26:48 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.10.88:49963] 14/05/22 14:26:48 INFO spark.SparkEnv: Registering MapOutputTracker 14/05/22 14:26:48 INFO spark.SparkEnv: Registering BlockManagerMaster 14/05/22 14:26:48 INFO storage.DiskBlockManager: Created local directory at /var/folders/r2/mbj08pb55n5d_9p8588xk5b0gn/T/spark-local-20140522142648-0a14 14/05/22 14:26:48 INFO storage.MemoryStore: MemoryStore started with capacity 911.6 MB. 14/05/22 14:26:48 INFO network.ConnectionManager: Bound socket to port 49964 with id = ConnectionManagerId(192.168.10.88,49964) 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/05/22 14:26:48 INFO storage.BlockManagerInfo: Registering block manager 192.168.10.88:49964 with 911.6 MB RAM 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Registered BlockManager 14/05/22 14:26:48 INFO spark.HttpServer: Starting HTTP Server [error] (run-main) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:549) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:91) at com.shrikar.sparkapps.NetworkWordCount$.main(NetworkWordCount.scala:39) at com.shrikar.sparkapps.NetworkWordCount.main(NetworkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) Thanks, Shrikar
Re: How to turn off MetadataCleaner?
The cleaner should remain up while the sparkcontext is still active (not stopped). However, here it seems you are stopping the sparkContext (ssc.stop(true)), the cleaner should be stopped. However, there was a bug earlier where some of the cleaners may not have been stopped when the context is stopped. What version are you using. If it is 0.9.1, I can see that the cleaner in ShuffleBlockManagerhttps://github.com/apache/spark/blob/v0.9.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scalais not stopped, so it is a bug. TD On Thu, May 22, 2014 at 9:24 AM, Adrian Mocanu amoc...@verticalscope.comwrote: Hi After using sparks TestSuiteBase to run some tests I’ve noticed that at the end, after finishing all tests the cleaner is still running and outputs the following perdiodically: INFO o.apache.spark.util.MetadataCleaner - Ran metadata cleaner for SHUFFLE_BLOCK_MANAGER I use method testOperation and I’ve changed it so that it stores the pointer to ssc after running setupStreams. Then using that pointer to turn things off, but the cleaner remains up. How to shut down all of spark, including cleaner? Here is how I changed testOperation method (changes in bold): def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] = DStream[V], expectedOutput: Seq[Seq[V]], numBatches: Int, useSet: Boolean ) { val numBatches_ = if (numBatches 0) numBatches else expectedOutput.size *val ssc* = setupStreams[U, V](input, operation) val output = runStreams[V](ssc, numBatches_, expectedOutput.size) verifyOutput[V](output, expectedOutput, useSet) *ssc.awaitTermination(500)* *ssc.stop(true)* } -Adrian
Re: Unable to run a Standalone job
How are you getting Spark with 1.0.0-SNAPSHOT through maven? Did you publish Spark locally which allowed you to use it as a dependency? This is a weird indeed. SBT should take care of all the dependencies of spark. In any case, you can try the last released Spark 0.9.1 and see if the problem persists. On Thu, May 22, 2014 at 3:59 PM, Shrikar archak shrika...@gmail.com wrote: I am running as sbt run. I am running it locally . Thanks, Shrikar On Thu, May 22, 2014 at 3:53 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you launching the application? sbt run ? spark-submit? local mode or Spark standalone cluster? Are you packaging all your code into a jar? Looks to me that you seem to have spark classes in your execution environment but missing some of Spark's dependencies. TD On Thu, May 22, 2014 at 2:27 PM, Shrikar archak shrika...@gmail.com wrote: Hi All, I am trying to run the network count example as a seperate standalone job and running into some issues. Environment: 1) Mac Mavericks 2) Latest spark repo from Github. I have a structure like this Shrikars-MacBook-Pro:SimpleJob shrikar$ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/NetworkWordCount.scala ./src/main/scala/SimpleApp.scala.bk simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0-SNAPSHOT, org.apache.spark %% spark-streaming % 1.0.0-SNAPSHOT) resolvers += Akka Repository at http://repo.akka.io/releases/; I am able to run the SimpleApp which is mentioned in the doc but when I try to run the NetworkWordCount app I get error like this am I missing something? [info] Running com.shrikar.sparkapps.NetworkWordCount 14/05/22 14:26:47 INFO spark.SecurityManager: Changing view acls to: shrikar 14/05/22 14:26:47 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(shrikar) 14/05/22 14:26:48 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/05/22 14:26:48 INFO Remoting: Starting remoting 14/05/22 14:26:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.10.88:49963] 14/05/22 14:26:48 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.10.88:49963] 14/05/22 14:26:48 INFO spark.SparkEnv: Registering MapOutputTracker 14/05/22 14:26:48 INFO spark.SparkEnv: Registering BlockManagerMaster 14/05/22 14:26:48 INFO storage.DiskBlockManager: Created local directory at /var/folders/r2/mbj08pb55n5d_9p8588xk5b0gn/T/spark-local-20140522142648-0a14 14/05/22 14:26:48 INFO storage.MemoryStore: MemoryStore started with capacity 911.6 MB. 14/05/22 14:26:48 INFO network.ConnectionManager: Bound socket to port 49964 with id = ConnectionManagerId(192.168.10.88,49964) 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/05/22 14:26:48 INFO storage.BlockManagerInfo: Registering block manager 192.168.10.88:49964 with 911.6 MB RAM 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Registered BlockManager 14/05/22 14:26:48 INFO spark.HttpServer: Starting HTTP Server [error] (run-main) java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.start(HttpServer.scala:54) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:549) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:91) at com.shrikar.sparkapps.NetworkWordCount$.main(NetworkWordCount.scala:39) at com.shrikar.sparkapps.NetworkWordCount.main(NetworkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) Thanks, Shrikar
Re: any way to control memory usage when streaming input's speed is faster than the speed of handled by spark streaming ?
Unfortunately, there is no API support for this right now. You could implement it yourself by implementing your own receiver and controlling the rate at which objects are received. If you are using any of the standard receivers (Flume, Kafka, etc.), I recommended looking at the source code of the corresponding receiver and making your own version of Alternatively, there is a open JIRAhttps://issues.apache.org/jira/browse/SPARK-1341about this implementing this functionality. You could give it a shot at implementing this in a generic that it can be used for all receivers ;) On Tue, May 20, 2014 at 8:31 PM, Francis.Hu francis...@reachjunction.comwrote: sparkers, Is there a better way to control memory usage when streaming input's speed is faster than the speed of handled by spark streaming ? Thanks, Francis.Hu
Re: any way to control memory usage when streaming input's speed is faster than the speed of handled by spark streaming ?
Apologies for the premature send. Unfortunately, there is no API support for this right now. You could implement it yourself by implementing your own receiver and controlling the rate at which objects are received. If you are using any of the standard receivers (Flume, Kafka, etc.), I recommended looking at the source code of the corresponding receiver and making your own version of Flume receiver / Kafka receiver. Alternatively, there is a open JIRAhttps://issues.apache.org/jira/browse/SPARK-1341 about this implementing this functionality. You could give it a shot at implementing this in a generic that it can be used for all receivers ;) TD On Wed, May 21, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Unfortunately, there is no API support for this right now. You could implement it yourself by implementing your own receiver and controlling the rate at which objects are received. If you are using any of the standard receivers (Flume, Kafka, etc.), I recommended looking at the source code of the corresponding receiver and making your own version of Alternatively, there is a open JIRAhttps://issues.apache.org/jira/browse/SPARK-1341about this implementing this functionality. You could give it a shot at implementing this in a generic that it can be used for all receivers ;) On Tue, May 20, 2014 at 8:31 PM, Francis.Hu francis...@reachjunction.comwrote: sparkers, Is there a better way to control memory usage when streaming input's speed is faster than the speed of handled by spark streaming ? Thanks, Francis.Hu
Re: tests that run locally fail when run through bamboo
This do happens sometimes, but it is a warning because Spark is designed try successive ports until it succeeds. So unless a cray number of successive ports are blocked (runaway processes?? insufficient clearing of ports by OS??), these errors should not be a problem for tests passing. On Wed, May 21, 2014 at 2:31 PM, Adrian Mocanu amoc...@verticalscope.comwrote: Just found this at the top of the log: 17:14:41.124 [pool-7-thread-3-ScalaTest-running-StreamingSpikeSpec] WARN o.e.j.u.component.AbstractLifeCycle - FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use build 21-May-2014 17:14:41 java.net.BindException: Address already in use Is there a way to set these connection up so that they don’t all start on the same port (that’s my guess for the root cause of the issue) *From:* Adrian Mocanu [mailto:amoc...@verticalscope.com] *Sent:* May-21-14 4:58 PM *To:* u...@spark.incubator.apache.org; user@spark.apache.org *Subject:* tests that run locally fail when run through bamboo I have a few test cases for Spark which extend TestSuiteBase from org.apache.spark.streaming. The tests run fine on my machine but when I commit to repo and run the tests automatically with bamboo the test cases fail with these errors. How to fix? 21-May-2014 16:33:09 [info] StreamingZigZagSpec: 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 times (most recent failure: Exception failure: java.io.StreamCorruptedException: invalid type code: AC) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at scala.Option.foreach(Option.scala:236) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 21-May-2014 16:33:09 [info] ... 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream with intermittent empty RDDs *** FAILED *** 21-May-2014 16:33:09 [info] Operation timed out after 10042 ms (TestSuiteBase.scala:283) 21-May-2014 16:33:09 [info] - compute zigzag indicator in stream with 3 empty RDDs *** FAILED *** 21-May-2014 16:33:09 [info] org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1 times (most recent failure: Exception failure: java.io.FileNotFoundException: /tmp/spark-local-20140521163241-1707/0f/shuffle_1_1_1 (No such file or directory)) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 21-May-2014 16:33:09 [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at scala.Option.foreach(Option.scala:236) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 21-May-2014 16:33:09 [info] at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 21-May-2014 16:33:09 [info] ...
Re: I want to filter a stream by a subclass.
You could do records.filter { _.isInstanceOf[Orange] } .map { _.asInstanceOf[Orange] } On Wed, May 21, 2014 at 3:28 PM, Ian Holsman i...@holsman.com.au wrote: Hi. Firstly I'm a newb (to both Scala Spark). I have a stream, that contains multiple types of records, and I would like to create multiple streams based on that currently I have it set up as class ALL class Orange extends ALL class Apple extends ALL now I can easily add a filter ala val records:DStream[ALL] = ...mapper to build the classes off the wire... val orangeRecords = records.filter {_.isInstanceOf[Orange]} but I would like to have the line be a DStream[Orange] instead of a DStream[ALL] (So I can access the unique fields in the subclass). I'm using 0.9.1 if it matters. TIA Ian -- Ian Holsman i...@holsman.com.au PH: + 61-3-9028 8133 / +1-(425) 998-7083
Re: Failed RC-10 yarn-cluster job for FS closed error when cleaning up staging directory
Are you running a vanilla Hadoop 2.3.0 or the one that comes with CDH5 / HDP(?) ? We may be able to reproduce this in that case. TD On Wed, May 21, 2014 at 8:35 PM, Tom Graves tgraves...@yahoo.com wrote: It sounds like something is closing the hdfs filesystem before everyone is really done with it. The filesystem gets cached and is shared so if someone closes it while other threads are still using it you run into this error. Is your application closing the filesystem? Are you using the event logging feature? Could you share the options you are running with? Yarn will retry the application depending on how the Application Master attempt fails (this is a configurable setting as to how many times it retries). That is probably the second driver you are referring to. But they shouldn't have overlapped as far as both being up at the same time. Is that the case you are seeing? Generally you want to look at why the first application attempt fails. Tom On Wednesday, May 21, 2014 6:10 PM, Kevin Markey kevin.mar...@oracle.com wrote: I tested an application on RC-10 and Hadoop 2.3.0 in yarn-cluster mode that had run successfully with Spark-0.9.1 and Hadoop 2.3 or 2.2. The application successfully ran to conclusion but it ultimately failed. There were 2 anomalies... 1. ASM reported only that the application was ACCEPTED. It never indicated that the application was RUNNING. 14/05/21 16:06:12 INFO yarn.Client: Application report from ASM: application identifier: application_1400696988985_0007 appId: 7 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: default appMasterRpcPort: -1 appStartTime: 1400709970857 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://Sleepycat:8088/proxy/application_1400696988985_0007/http://sleepycat:8088/proxy/application_1400696988985_0007/ appUser: hduser Furthermore, it *started a second container*, running two partly *overlapping* drivers, when it appeared that the application never started. Each container ran to conclusion as explained above, taking twice as long as usual for both to complete. Both instances had the same concluding failure. 2. Each instance failed as indicated by the stderr log, finding that the *filesystem was closed* when trying to clean up the staging directories. 14/05/21 16:08:24 INFO Executor: Serialized size of result for 1453 is 863 14/05/21 16:08:24 INFO Executor: Sending result for 1453 directly to driver 14/05/21 16:08:24 INFO Executor: Finished task ID 1453 14/05/21 16:08:24 INFO TaskSetManager: Finished TID 1453 in 202 ms on localhost (progress: 2/2) 14/05/21 16:08:24 INFO DAGScheduler: Completed ResultTask(1507, 1) 14/05/21 16:08:24 INFO TaskSchedulerImpl: Removed TaskSet 1507.0, whose tasks have all completed, from pool 14/05/21 16:08:24 INFO DAGScheduler: Stage 1507 (count at KEval.scala:32) finished in 0.417 s 14/05/21 16:08:24 INFO SparkContext: Job finished: count at KEval.scala:32, took 1.532789283 s 14/05/21 16:08:24 INFO SparkUI: Stopped Spark web UI at http://dhcp-brm-bl1-215-1e-east-10-135-123-92.usdhcp.oraclecorp.com:42250 14/05/21 16:08:24 INFO DAGScheduler: Stopping DAGScheduler 14/05/21 16:08:25 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/05/21 16:08:25 INFO ConnectionManager: Selector thread was interrupted! 14/05/21 16:08:25 INFO ConnectionManager: ConnectionManager stopped 14/05/21 16:08:25 INFO MemoryStore: MemoryStore cleared 14/05/21 16:08:25 INFO BlockManager: BlockManager stopped 14/05/21 16:08:25 INFO BlockManagerMasterActor: Stopping BlockManagerMaster 14/05/21 16:08:25 INFO BlockManagerMaster: BlockManagerMaster stopped 14/05/21 16:08:25 INFO SparkContext: Successfully stopped SparkContext 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/05/21 16:08:25 INFO ApplicationMaster: *finishApplicationMaster with SUCCEEDED* 14/05/21 16:08:25 INFO ApplicationMaster: AppMaster received a signal. 14/05/21 16:08:25 INFO ApplicationMaster: Deleting staging directory .sparkStaging/application_1400696988985_0007 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/05/21 16:08:25 ERROR *ApplicationMaster: Failed to cleanup staging dir .sparkStaging/application_1400696988985_0007* *java.io.IOException: Filesystem closed* at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1685) at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:591) at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:587) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at
Re: question about the license of akka and Spark
Akka is under Apache 2 license too. http://doc.akka.io/docs/akka/snapshot/project/licenses.html On Tue, May 20, 2014 at 2:16 AM, YouPeng Yang yypvsxf19870...@gmail.comwrote: Hi Just know akka is under a commercial license,however Spark is under the apache license. Is there any problem? Regards
Re: life if an executor
That's one the main motivation in using Tachyon ;) http://tachyon-project.org/ It gives off heap in-memory caching. And starting Spark 0.9, you can cache any RDD in Tachyon just by specifying the appropriate StorageLevel. TD On Mon, May 19, 2014 at 10:22 PM, Mohit Jaggi mohitja...@gmail.com wrote: I guess it needs to be this way to benefit from caching of RDDs in memory. It would be nice however if the RDD cache can be dissociated from the JVM heap so that in cases where garbage collection is difficult to tune, one could choose to discard the JVM and run the next operation in a few one. On Mon, May 19, 2014 at 10:06 PM, Matei Zaharia matei.zaha...@gmail.comwrote: They’re tied to the SparkContext (application) that launched them. Matei On May 19, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote: from looking at the source code i see executors run in their own jvm subprocesses. how long to they live for? as long as the worker/slave? or are they tied to the sparkcontext and life/die with it? thx
Re: Equivalent of collect() on DStream
Doesnt DStream.foreach() suffice? anyDStream.foreach { rdd = // do something with rdd } On Wed, May 14, 2014 at 9:33 PM, Stephen Boesch java...@gmail.com wrote: Looking further it appears the functionality I am seeking is in the following *private[spark] * class ForEachdStream (version 0.8.1 , yes we are presently using an older release..) private[streaming] class ForEachDStream[T: ClassManifest] ( parent: DStream[T], *foreachFunc: (RDD[T], Time) = Unit* ) extends DStream[Unit](parent.ssc) { I would like to have access to this structure - particularly the ability to define an foreachFunc that gets applied to each RDD within the DStream. Is there a means to do so? 2014-05-14 21:25 GMT-07:00 Stephen Boesch java...@gmail.com: Given that collect() does not exist on DStream apparently my mental model of Streaming RDD (DStream) needs correction/refinement. So what is the means to convert DStream data into a JVM in-memory representation. All of the methods on DStream i.e. filter, map, transform, reduce, etc generate other DStream's, and not an in memory data structure.
Re: missing method in my slf4j after excluding Spark ZK log4j
This gives dependency tree in SBT (spark uses this). https://github.com/jrudolph/sbt-dependency-graph TD On Mon, May 12, 2014 at 4:55 PM, Sean Owen so...@cloudera.com wrote: It sounds like you are doing everything right. NoSuchMethodError suggests it's finding log4j, just not the right version. That method is definitely in 1.2; it might have been removed in 2.x? (http://logging.apache.org/log4j/2.x/manual/migration.html) So I wonder if something is sneaking in log4j 2.x in your app? that's a first guess. I'd say consult mvn dependency:tree, but you're on sbt and I don't know the equivalent. On Mon, May 12, 2014 at 3:51 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hey guys, I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j dependency and was told that it was gone. However I still find it part of zookeeper imports. This is fine since I exclude it myself in the sbt file, but another issue arises. I wonder if anyone else has run into this. Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its log4j v 1.2.17 because I get missing method error: java.lang.NoSuchMethodError: org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58) at scala.Option.map(Option.scala:145) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126) at org.apache.spark.SparkContext.init(SparkContext.scala:139) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76) ... Is there a way to find out what versions of slf4j I need to make it work with log4j 1.2.17? -Adrian
Re: streaming on hdfs can detected all new file, but the sum of all the rdd.count() not equals which had detected
A very crucial thing to remember when using file stream is that the files must be written to the monitored directory atomically. That is when the file system show the file in its listing, the file should not be appended / updated after that. That often causes this kind of issues, as spark streaming may the file (soon after it is visible in the listing) and may try to process it even before all of the data has been written. So the best way to feed data into spark streaming is to write the file to a temp dir, and them move / rename them into the monitored directory. That makes it atomic. This is mentioned in the API docs of fileStreamhttp://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext . TD On Sun, May 11, 2014 at 7:30 PM, zqf12345 zqf12...@gmail.comwrote: when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200 files , but the sum of rdd.count() is less than 200, always between 130 and 170, I don't know why...Is this a Bug? PS: When I put 200 files in hdfs before streaming run , It get the correct count and right result. Here is the code: def main(args: Array[String]) { val conf = new SparkConf().setMaster(SparkURL) .setAppName(QimageStreaming-broadcast) .setSparkHome(System.getenv(SPARK_HOME)) .setJars(SparkContext.jarOfClass(this.getClass())) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, qing.hdu.Image.MyRegistrator) conf.set(spark.kryoserializer.buffer.mb, 10); val ssc = new StreamingContext(conf, Seconds(2)) val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]] val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]] val input_path = HdfsURL + /Qimage/input val output_path = HdfsURL + /Qimage/output/ val bg_path = HdfsURL + /Qimage/bg/ val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage, QimageInputFormat[Text, Qimage]](bg_path) val bbg = bg.map(data = (data._1.toString(), data._2)) val broadcastbg = ssc.sparkContext.broadcast(bbg) val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text, Qimage]](input_path) val qingbg = broadcastbg.value.collectAsMap val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) = { val rddnum = rdd.count System.out.println(\n\n+ rddnum is + rddnum + \n\n) if (rddnum 0) { System.out.println(here is foreachFunc) val a = rdd.keys val b = a.first val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage) rdd.map(data = (data._1, (new QimageProc(data._1, data._2)).koutu(cbg))) .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage], outputFormatClass) } } file.foreachRDD(foreachFunc) ssc.start() ssc.awaitTermination() } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Average of each RDD in Stream
Use DStream.foreachRDD to do an operation on the final RDD of every batch. val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 + b._1, a._2 + b._2) } sumandcount.foreachRDD { rdd = val first: (Double, Int) = rdd.take(1) ; ... } DStream.reduce creates DStream whose RDDs have just one tuple each. The rdd.take(1) above gets that one tuple. However note that there is a corner case in this approach. If in a particular batch, there is not data, then the rdd will have zero elements (no data, nothing to reduce). So you have to take that into account (maybe do a rdd.collect(), check the size, and then get the first / only element). TD On Wed, May 7, 2014 at 7:59 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I use the following code for calculating average. The problem is that the reduce operation return a DStream here and not a tuple as it normally does without Streaming. So how can we get the sum and the count from the DStream. Can we cast it to tuple? val numbers = ssc.textFileStream(args(1)) val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 + b._1, a._2 + b._2) } sumandcount.print() Regards, Laeeq
Re: Proper way to stop Spark stream processing
Since you are using the latest Spark code and not Spark 0.9.1 (guessed from the log messages), you can actually do graceful shutdown of a streaming context. This ensures that the receivers are properly stopped and all received data is processed and then the system terminates (stop() stays blocked until then. See other variations of streamingContext.stop(). TD On Mon, May 12, 2014 at 2:49 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hello, I am trying to implement something like process a stream for N seconds, then return a result with Spark Streaming (built from git head). My approach (which is probably not very elegant) is val ssc = new StreamingContext(...) ssc.start() future { Thread.sleep(Seconds(N)) ssc.stop(true) } ssc.awaitTermination() and in fact, this stops the stream processing. However, I get the following error messages: 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost: 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found (where localhost: is the source I am reading the stream from). This doesn't actually seem like the proper way to do it. Can anyone point me to how to implement stop after N seconds without these error messages? Thanks Tobias
Re: How to use spark-submit
Doesnt the run-example script work for you? Also, are you on the latest commit of branch-1.0 ? TD On Mon, May 5, 2014 at 7:51 PM, Soumya Simanta soumya.sima...@gmail.comwrote: Yes, I'm struggling with a similar problem where my class are not found on the worker nodes. I'm using 1.0.0_SNAPSHOT. I would really appreciate if someone can provide some documentation on the usage of spark-submit. Thanks On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com wrote: I have a spark streaming application that uses the external streaming modules (e.g. kafka, mqtt, ..) as well. It is not clear how to properly invoke the spark-submit script: what are the ---driver-class-path and/or -Dspark.executor.extraClassPath parameters required? For reference, the following error is proving difficult to resolve: java.lang.ClassNotFoundException: org.apache.spark.streaming.examples.StreamingExamples
Re: sbt run with spark.ContextCleaner ERROR
Okay, this needs to be fixed. Thanks for reporting this! On Mon, May 5, 2014 at 11:00 PM, wxhsdp wxh...@gmail.com wrote: Hi, TD i tried on v1.0.0-rc3 and still got the error -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304p5421.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming question
One main reason why Spark Streaming can achieve higher throughput than Storm is because Spark Streaming operates in coarser-grained batches - second-scale massive batches - which reduce per-tuple of overheads in shuffles, and other kinds of data movements, etc. Note that, this is also true that this increased throughput does not come for free: larger batches --- larger end-to-end latency. Storm may give a lower end-to-end latency than Spark Streaming (second-scale latency with second-scale batches). However, we have observed that for a large variety of streaming usecases, people are often okay with second-scale latencies but find it much harder work around the atleast-once semantics (double-counting, etc.) and lack of in-built state management (state kept locally in worker can get lost if worker dies). Plus Spark Streaming has the major advantage of having a simpler, higher-level API than Storm and the whole Spark ecosystem (Spark SQL, MLlib, etc.) around it that it can use for writing streaming analytics applications very easily. Regarding Trident, we have heard from many developers that Trident gives lower throughput than Storm due to its transactional guarantees. Its hard to say the reasons behind the performance penalty without doing a very detailed head-to-head analysis. TD On Sun, May 4, 2014 at 5:11 PM, Chris Fregly ch...@fregly.com wrote: great questions, weide. in addition, i'd also like to hear more about how to horizontally scale a spark-streaming cluster. i've gone through the samples (standalone mode) and read the documentation, but it's still not clear to me how to scale this puppy out under high load. i assume i add more receivers (kinesis, flume, etc), but physically how does this work? @TD: can you comment? thanks! -chris On Sun, May 4, 2014 at 2:10 PM, Weide Zhang weo...@gmail.com wrote: Hi , It might be a very general question to ask here but I'm curious to know why spark streaming can achieve better throughput than storm as claimed in the spark streaming paper. Does it depend on certain use cases and/or data source ? What drives better performance in spark streaming case or in other ways, what makes storm not as performant as spark streaming ? Also, in order to guarantee exact-once semantics when node failure happens, spark makes replicas of RDDs and checkpoints so that data can be recomputed on the fly while on Trident case, they use transactional object to persist the state and result but it's not obvious to me which approach is more costly and why ? Any one can provide some experience here ? Thanks a lot, Weide
Re: Spark Streaming and JMS
A few high-level suggestions. 1. I recommend using the new Receiver API in almost-released Spark 1.0 (see branch-1.0 / master branch on github). Its a slightly better version of the earlier NetworkReceiver, as it hides away blockgenerator (which needed to be unnecessarily manually started and stopped) and add other lifecycle management methods like stop, restart, reportError to deal with errors in receiving data. Also, adds ability to write custom receiver from Java. Take a look at this examplehttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala of writing custom receiver in the new API. I am updating the custom receiver guide right now (https://github.com/apache/spark/pull/652). 2. Once you create a JMSReceiver class by extending NetworkReceiver/Receiver, you can create DStream out of the receiver by val jmsStream = ssc.networkStream(new JMSReceiver()) 3. As far as i understand from seeing the docs of akka,camel.Consumerhttp://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer, it is essentially a specialized Akka actor. For Akka actors, there is a ssc.actorStream, where you can specify your own actor class. You get actor supervision (and therefore error handling, etc.) with that. See the example AkkaWordCount - old style using NetworkReceiverhttps://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala, or new style using Receiverhttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala . I havent personally played around with JMS before so cant comment much on JMS specific intricacies. TD On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin mcgloin.patr...@gmail.comwrote: Hi all, Is there a best practice for subscribing to JMS with Spark Streaming? I have searched but not found anything conclusive. In the absence of a standard practice the solution I was thinking of was to use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark Streaming Custom Receiver. So the actor would look something like this: class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with Consumer { //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1 def endpointUri = jmsURI lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER) protected override def onStart() { blockGenerator.start } def receive = { case msg: CamelMessage = { blockGenerator += msg.body } case _ = { /* ... */ } } protected override def onStop() { blockGenerator.stop } } And then in the main application create receivers like this: val ssc = new StreamingContext(...) object tascQueue extends JmsReceiver[String](ssc) { override def getReceiver():JmsReceiver[String] = { new JmsReceiver(jms :sonicmq://localhost:2506/queue?destination=TascQueue) } } ssc.registerInputStream(tascQueue) Is this the best way to go? Best regards, Patrick
Re: spark run issue
All the stuff in lib_managed are what gets downloaded by sbt/maven when you compile. Those are necessary for running spark, spark streaming, etc. But you should not have to add all that to classpath individually and manually when running Spark programs. If you are trying to run your Spark program locally, you should use sbt or maven to compile your project with Spark as a dependency, and sbt/maven will take care of putting all the necessary jars in the classpath (when you run run your program with sbt/maven). If you are trying to run your Spark program on a cluster, then refer to the deployment guide http://spark.apache.org/docs/latest/cluster-overview.html. To run a Spark stand alone cluster, you just have to compile spark and place the whole spark directory on the worker nodes. For other deploy modes like Yarn and Mesos, you should just compile spark into a big all-inclusive jar and supply that when you launch your program on Yarn/mesos. See the guide for more details. TD On Sat, May 3, 2014 at 7:24 PM, Weide Zhang weo...@gmail.com wrote: Hi Tathagata, I actually have a separate question. What's the usage of lib_managed folder inside spark source folder ? Are those the library required for spark streaming to run ? Do they needed to be added to spark classpath when starting sparking cluster? Weide On Sat, May 3, 2014 at 7:08 PM, Weide Zhang weo...@gmail.com wrote: Hi Tathagata, I figured out the reason. I was adding a wrong kafka lib along side with the version spark uses. Sorry for spamming. Weide On Sat, May 3, 2014 at 7:04 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am a little confused about the version of Spark you are using. Are you using Spark 0.9.1 that uses scala 2.10.3 ? TD On Sat, May 3, 2014 at 6:16 PM, Weide Zhang weo...@gmail.com wrote: Hi I'm trying to run the kafka-word-count example in spark2.9.1. I encountered some exception when initialize kafka consumer/producer config. I'm using scala 2.10.3 and used maven build inside spark streaming kafka library comes with spark2.9.1. Any one see this exception before? Thanks, producer: [java] The args attribute is deprecated. Please use nested arg elements. [java] Exception in thread main java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp [java] at kafka.producer.ProducerConfig.init(ProducerConfig.scala:56) [java] at com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89) [java] at com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala) [java] Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202) [java] at java.security.AccessController.doPrivileged(Native Method) [java] at java.net.URLClassLoader.findClass(URLClassLoader.java:190) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306) [java] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247) [java] ... 3 more [java] Java Result: 1
Re: sbt run with spark.ContextCleaner ERROR
Can you tell which version of Spark you are using? Spark 1.0 RC3, or something intermediate? And do you call sparkContext.stop at the end of your application? If so, does this error occur before or after the stop()? TD On Sun, May 4, 2014 at 2:40 AM, wxhsdp wxh...@gmail.com wrote: Hi, all i use sbt to run my spark application, after the app completes, error occurs: 14/05/04 17:32:28 INFO network.ConnectionManager: Selector thread was interrupted! 14/05/04 17:32:28 ERROR spark.ContextCleaner: Error in cleaning thread java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135) at org.apache.spark.ContextCleaner.org $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:116) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:64) has anyone met this before? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: another updateStateByKey question
Could be a bug. Can you share a code with data that I can use to reproduce this? TD On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.com wrote: Has anyone else noticed that *sometimes* the same tuple calls update state function twice? I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ] When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2)) Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( )) In the update function I also save Seq[V] to state so I can see it in the RDD. I also show a count and sum of the values. StateClass(sum, count, Seq[V]) Why is the update function called with empty Seq[V] on the same key when all values for that key have been already taken care of in a previous update? -Adrian
Re: Spark streaming
Take a look at the RDD.pipe() operation. That allows you to pipe the data in a RDD to any external shell command (just like Unix Shell pipe). On May 1, 2014 10:46 AM, Mohit Singh mohit1...@gmail.com wrote: Hi, I guess Spark is using streaming in context of streaming live data but what I mean is something more on the lines of hadoop streaming.. where one can code in any programming language? Or is something among that lines on the cards? Thanks -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
Re: What is Seq[V] in updateStateByKey?
Depends on your code. Referring to the earlier example, if you do words.map(x = (x,1)).updateStateByKey() then for a particular word, if a batch contains 6 occurrences of that word, then the Seq[V] will be [1, 1, 1, 1, 1, 1] Instead if you do words.map(x = (x,1)).reduceByKey(_ + _).updateStateByKey(...) then Seq[V] will be [ 6 ] , that is, all the 1s will be summed up already due to the reduceByKey. TD On Thu, May 1, 2014 at 7:29 AM, Adrian Mocanu amoc...@verticalscope.comwrote: So Seq[V] contains only new tuples. I initially thought that whenever a new tuple was found, it would add it to Seq and call the update function immediately so there wouldn't be more than 1 update to Seq per function call. Say I want to sum tuples with the same key is an RDD using updateStateByKey, Then (1) Seq[V] would contain the numbers for a particular key and my S state could be the sum? Or would (2) Seq contain partial sums (say sum per partition?) which I then need to sum into the final sum? After writing this out and thinking a little more about it I think #2 is correct. Can you confirm? Thanks again! -A -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: April-30-14 4:30 PM To: user@spark.apache.org Subject: Re: What is Seq[V] in updateStateByKey? S is the previous count, if any. Seq[V] are potentially many new counts. All of them have to be added together to keep an accurate total. It's as if the count were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not 1 + 1. I butted in since I'd like to ask a different question about the same line of code. Why: val currentCount = values.foldLeft(0)(_ + _) instead of val currentCount = values.sum This happens a few places in the code. sum seems equivalent and likely quicker. Same with things like filter(_ == 200).size instead of count(_ == 200)... pretty trivial but hey. On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi TD, Why does the example keep recalculating the count via fold? Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count? From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called. Is my understanding correct?
Re: range partitioner with updateStateByKey
Ordered by what? arrival order? sort order? TD On Thu, May 1, 2014 at 2:35 PM, Adrian Mocanu amoc...@verticalscope.comwrote: If I use a range partitioner, will this make updateStateByKey take the tuples in order? Right now I see them not being taken in order (most of them are ordered but not all) -Adrian
Re: What is Seq[V] in updateStateByKey?
Yeah, I remember changing fold to sum in a few places, probably in testsuites, but missed this example I guess. On Wed, Apr 30, 2014 at 1:29 PM, Sean Owen so...@cloudera.com wrote: S is the previous count, if any. Seq[V] are potentially many new counts. All of them have to be added together to keep an accurate total. It's as if the count were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not 1 + 1. I butted in since I'd like to ask a different question about the same line of code. Why: val currentCount = values.foldLeft(0)(_ + _) instead of val currentCount = values.sum This happens a few places in the code. sum seems equivalent and likely quicker. Same with things like filter(_ == 200).size instead of count(_ == 200)... pretty trivial but hey. On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi TD, Why does the example keep recalculating the count via fold? Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count? From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called. Is my understanding correct?
Re: What is Seq[V] in updateStateByKey?
You may have already seen it, but I will mention it anyways. This example may help. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala Here the state is essentially a running count of the words seen. So the value type (i.e, V) is Int (count of a word in each batch) and the state type (i.e. S) is also a Int (running count). The updateFunction essentially sums up the running count with the new count and to generate a new running count. TD On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen so...@cloudera.com wrote: The original DStream is of (K,V). This function creates a DStream of (K,S). Each time slice brings one or more new V for each K. The old state S (can be different from V!) for each K -- possibly non-existent -- is updated in some way by a bunch of new V, to produce a new state S -- which also might not exist anymore after update. That's why the function is from a Seq[V], and an Option[S], to an Option[S]. If you RDD has value type V = Double then your function needs to update state based on a new Seq[Double] at each time slice, since Doubles are the new thing arriving for each key at each time slice. On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu amoc...@verticalscope.com wrote: What is Seq[V] in updateStateByKey? Does this store the collected tuples of the RDD in a collection? Method signature: def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the moment I switched to a different type like Seq[(String, Double)] the code didn’t compile. -Adrian
Re: Spark's behavior
Strange! Can you just do lines.print() to print the raw data instead of doing word count. Beyond that we can do two things. 1. Can see the Spark stage UI to see whether there are stages running during the 30 second period you referred to? 2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see different thread by Patrick), it has a streaming UI, which shows the number of records received, the state of the receiver, etc. That may be more useful in debugging whats going on . TD On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, We are not using stream context with master local, we have 1 Master and 8 Workers and 1 word source. The command line that we are using is: bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount spark://192.168.0.13:7077 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com wrote: Is you batch size 30 seconds by any chance? Assuming not, please check whether you are creating the streaming context with master local[n] where n 2. With local or local[1], the system only has one processing slot, which is occupied by the receiver leaving no room for processing the received data. It could be that after 30 seconds, the server disconnects, the receiver terminates, releasing the single slot for the processing to proceed. TD On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155 Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Java Spark Streaming - SparkFlumeEvent
You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using SparkFlumeEvent.event. That should probably give you all the original text data. On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram vikram.kulka...@hp.comwrote: Hi Spark-users, Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a ‘spooling directory source’ to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file. My Spark Streaming Code snippet here: System.*out*.println(Setting up Flume Stream using Avro Sink at: + avroServer + : + avroPort); //JavaDStreamSparkFlumeEvent flumeStream = sc.flumeStream(XXX.YYY.XXX.YYY, port); JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.*createStream*(ssc, avroServer, avroPort); flumeStream.count(); flumeStream.foreach(*new* *FunctionJavaRDDSparkFlumeEvent,Void ()* { @Override *public* Void call(JavaRDDSparkFlumeEvent eventsData) *throws* Exception { ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); System.*out*.println( Received Spark Flume Events: + events.size()); *while*(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); //System.out.println(SparkFlumeEvent = + flumeEvent); //System.out.println( + flumeEvent.toString()); //TODO: How to build each line in the file using this SparkFlumeEvent object? } *return* *null*; } }); Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory. Regards, Vikram
Re: Bind exception while running FlumeEventCount
Hello Neha, This is the result of a known bug in 0.9. Can you try running the latest Spark master branch to see if this problem is resolved? TD On Tue, Apr 22, 2014 at 2:48 AM, NehaS Singh nehas.si...@lntinfotech.comwrote: Hi, I have installed spark-0.9.0-incubating-bin-cdh4 and I am using apache flume for streaming. I have used the streaming.examples.FlumeEventCount. Also I have written Avro conf file for flume.When I try to do streamin ing spark and I run the following command it throws error ./bin/run-example org.apache.spark.streaming.examples.FlumeEventCount local[2] ip 10001 org.jboss.netty.channel.ChannelException: Failed to bind to: /ip:9988 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:143) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more Can you please help This is my flumetest.conf a1.sources = tail-file a1.channels = c1 a1.sinks=avro-sink # define the flow a1.sources.tail-file.channels = c1 a1.sinks.avro-sink.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 # define source and sink a1.sources.tail-file.type = exec a1.sources.tail-file.command = tail -F /home/hduser/Flume/test.log #a1.sources.tail-file.channels = c1 a1.sinks.avro-sink.type = avro a1.sinks.avro-sink.hostname = ip // agent a2.s ip address or host name a1.sinks.avro-sink.port = 10001 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1 a1.channels.c1.transactionCapacity = 1 # Bind the source and sink to the channel a1.sources.tail-file.channels = c1 a1.sinks.avro-sink.channel = c1 Flumeslavetest.conf:- a2.sources = avro-collection-source a2.sinks = hdfs-sink a2.channels = mem-channel # define the flow a2.sources.avro-collection-source.channels = mem-channel a2.sinks.hdfs-sink.channel = mem-channel a2.channels.mem-channel.type = memory a2.channels.mem-channel.capacity = 1000 # avro source properties a2.sources.avro-collection-source.type = avro a2.sources.avro-collection-source.bind = ip // agent a2.s ip address or host name a2.sources.avro-collection-source.port = 10001 # hdfs sink properties a2.sinks.hdfs-sink.type = hdfs a2.sinks.hdfs-sink.hdfs.writeFormat = Text a2.sinks.hdfs-sink.hdfs.filePrefix = testing a2.sinks.hdfs-sink.hdfs.path = hdfs://ip:8020/testingData My flume is running properly its is able to write file on hdfs. Please help as to how to resolve the error Regards, Neha Singh -- The contents of this e-mail and any attachment(s) may contain
Re: Strange behaviour of different SSCs with same Kafka topic
Are you by any chance starting two StreamingContexts in the same JVM? That could explain a lot of the weird mixing of data that you are seeing. Its not a supported usage scenario to start multiple streamingContexts simultaneously in the same JVM. TD On Thu, Apr 17, 2014 at 10:58 PM, gaganbm gagan.mis...@gmail.com wrote: It happens with normal data rate, i.e., lets say 20 records per second. Apart from that, I am also getting some more strange behavior. Let me explain. I establish two sscs. Start them one after another. In SSCs I get the streams from Kafka sources, and do some manipulations. Like, adding some Record_Name for example, to each of the incoming records. Now this Record_Name is different for both the SSCs, and I get this field from some other class, not relevant to the streams. Now, expected behavior should be, all records in SSC1 gets added with the field RECORD_NAME_1 and all records in SSC2 should get added with the field RECORD_NAME_2. Both the SSCs have nothing to do with each other as I believe. However, strangely enough, I find many records in SSC1 get added with RECORD_NAME_2 and vice versa. Is it some kind of serialization issue ? That, the class which provides this RECORD_NAME gets serialized and is reconstructed and then some weird thing happens inside ? I am unable to figure out. So, apart from skewed frequency and volume of records in both the streams, I am getting this inter-mingling of data among the streams. Can you help me in how to use some external data to manipulate the RDD records ? Thanks and regards Gagan B Mishra *Programmer* *560034, Bangalore* *India* On Tue, Apr 15, 2014 at 4:09 AM, Tathagata Das [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=4434i=0wrote: Does this happen at low event rate for that topic as well, or only for a high volume rate? TD On Wed, Apr 9, 2014 at 11:24 PM, gaganbm [hidden email]http://user/SendEmail.jtp?type=nodenode=4238i=0 wrote: I am really at my wits' end here. I have different Streaming contexts, lets say 2, and both listening to same Kafka topics. I establish the KafkaStream by setting different consumer groups to each of them. Ideally, I should be seeing the kafka events in both the streams. But what I am getting is really unpredictable. Only one stream gets a lot of events and the other one almost gets nothing or very less compared to the other. Also the frequency is very skewed. I get a lot of events in one stream continuously, and after some duration I get a few events in the other one. I don't know where I am going wrong. I can see consumer fetcher threads for both the streams that listen to the Kafka topics. I can give further details if needed. Any help will be great. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4238.html To start a new topic under Apache Spark User List, email [hidden email]http://user/SendEmail.jtp?type=nodenode=4434i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: Strange behaviour of different SSCs with same Kafka topichttp://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4434.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: question about the SocketReceiver
As long as the socket server sends data through the same connection, the existing code is going to work. The socket.getInputStream returns a input stream which will continuously allow you to pull data sent over the connection. The bytesToObject function continuously reads data from the input stream and coverts them to objects. This will continue until the input stream is closed (i.e., the connection is closed). TD On Sun, Apr 20, 2014 at 1:36 AM, YouPeng Yang yypvsxf19870...@gmail.comwrote: Hi I am studing the structure of the Spark Streaming(my spark version is 0.9.0). I have a question about the SocketReceiver.In the onStart function: --- protected def onStart() { logInfo(Connecting to + host + : + port) val socket = new Socket(host, port) logInfo(Connected to + host + : + port) blockGenerator.start() val iterator = bytesToObjects(socket.getInputStream()) while(iterator.hasNext) { val obj = iterator.next blockGenerator += obj } } - Here the Socket client is created and read data iteratively. My question is the onStart function is only called once by the super class NetworkReceiver, and correspondingly read data one time. When the socket server send data again, how does the SocketReceiver read the input data,I can find any src hint about the process. In my opinion, the Socket instance should read the data cyclically as following: - InputStream is = socket.getInputStream() while(theEndflag){ if(is.avariable = 0){ val iterator = bytesToObjects(is) while(iterator.hasNext) { val obj = iterator.next blockGenerator += obj } } } //theEndflag is the end flag of the loop,shoud be set to false when needed. --- I know it may not be the right thought,however i am really curious about the Socket read process because it hard to understand. Any suggestions will be appreciated.
Re: checkpointing without streaming?
Diana, that is a good question. When you persist an RDD, the system still remembers the whole lineage of parent RDDs that created that RDD. If one of the executor fails, and the persist data is lost (both local disk and memory data will get lost), then the lineage is used to recreate the RDD. The longer the lineage, the more recomputation the system has to do in case of failure, and hence higher recovery time. So its not a good idea to have a very long lineage, as it leads to all sorts of problems, like the one Xiangrui pointed to. Checkpointing an RDD actually saves the RDD data to HDFS and removes pointers to the parent RDDs (as the data can be regenerated just by reading from the HDFS file). So that RDDs data does not need to be recomputed when worker fails, just re-read. In fact, the data is also retained across driver restarts as it is in HDFS. RDD.checkpoint was introduced with streaming because streaming is obvious use case where the lineage will grow infinitely long (for stateful computations where each result depends on all the previously received data). However, this checkpointing is useful for any long running RDD computation, and I know that people have used RDD.checkpoint() independent of streaming. TD On Mon, Apr 21, 2014 at 1:10 PM, Xiangrui Meng men...@gmail.com wrote: Persist doesn't cut lineage. You might run into StackOverflow problem with a long lineage. See https://spark-project.atlassian.net/browse/SPARK-1006 for example. On Mon, Apr 21, 2014 at 12:11 PM, Diana Carroll dcarr...@cloudera.com wrote: When might that be necessary or useful? Presumably I can persist and replicate my RDD to avoid re-computation, if that's my goal. What advantage does checkpointing provide over disk persistence with replication? On Mon, Apr 21, 2014 at 2:42 PM, Xiangrui Meng men...@gmail.com wrote: Checkpoint clears dependencies. You might need checkpoint to cut a long lineage in iterative algorithms. -Xiangrui On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: Strange behaviour of different SSCs with same Kafka topic
Does this happen at low event rate for that topic as well, or only for a high volume rate? TD On Wed, Apr 9, 2014 at 11:24 PM, gaganbm gagan.mis...@gmail.com wrote: I am really at my wits' end here. I have different Streaming contexts, lets say 2, and both listening to same Kafka topics. I establish the KafkaStream by setting different consumer groups to each of them. Ideally, I should be seeing the kafka events in both the streams. But what I am getting is really unpredictable. Only one stream gets a lot of events and the other one almost gets nothing or very less compared to the other. Also the frequency is very skewed. I get a lot of events in one stream continuously, and after some duration I get a few events in the other one. I don't know where I am going wrong. I can see consumer fetcher threads for both the streams that listen to the Kafka topics. I can give further details if needed. Any help will be great. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark 0.9.1 released
Hi everyone, We have just posted Spark 0.9.1, which is a maintenance release with bug fixes, performance improvements, better stability with YARN and improved parity of the Scala and Python API. We recommend all 0.9.0 users to upgrade to this stable release. This is the first release since Spark graduated as a top level Apache project. Contributions to this release came from 37 developers. The full release notes are at: http://spark.apache.org/releases/spark-release-0-9-1.html You can download the release at: http://spark.apache.org/downloads.html Thanks all the developers who contributed to this release: Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch, Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao, Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai, Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout, Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham, Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang, Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu, shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng TD
Re: Spark 0.9.1 released
A small additional note: Please use the direct download links in the Spark Downloads http://spark.apache.org/downloads.html page. The Apache mirrors take a day or so to sync from the main repo, so may not work immediately. TD On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das tathagata.das1...@gmail.comwrote: Hi everyone, We have just posted Spark 0.9.1, which is a maintenance release with bug fixes, performance improvements, better stability with YARN and improved parity of the Scala and Python API. We recommend all 0.9.0 users to upgrade to this stable release. This is the first release since Spark graduated as a top level Apache project. Contributions to this release came from 37 developers. The full release notes are at: http://spark.apache.org/releases/spark-release-0-9-1.html You can download the release at: http://spark.apache.org/downloads.html Thanks all the developers who contributed to this release: Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch, Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao, Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai, Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout, Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham, Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang, Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu, shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng TD
Re: CheckpointRDD has different number of partitions than original RDD
Classpath /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar System Classpath http://10.10.41.67:40368/jars/spark-examples_2.10-assembly-0.9.0-incubating.jarAdded By User From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Monday, April 07, 2014 7:54 PM To: user@spark.apache.org Subject: Re: CheckpointRDD has different number of partitions than original RDD Few things that would be helpful. 1. Environment settings - you can find them on the environment tab in the Spark application UI 2. Are you setting the HDFS configuration correctly in your Spark program? For example, can you write a HDFS file from a Spark program (say spark-shell) to your HDFS installation and read it back into Spark (i.e., create a RDD)? You can test this by write an RDD as a text file from the shell, and then try to read it back from another shell. 3. If that works, then lets try explicitly checkpointing an RDD. To do this you can take any RDD and do the following. myRDD.checkpoint() myRDD.count() If there is some issue, then this should reproduce the above error. TD On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren pmog...@commercehub.com wrote: Hello, Spark community! My name is Paul. I am a Spark newbie, evaluating version 0.9.0 without any Hadoop at all, and need some help. I run into the following error with the StatefulNetworkWordCount example (and similarly in my prototype app, when I use the updateStateByKey operation). I get this when running against my small cluster, but not (so far) against local[2]. 61904 [spark-akka.actor.default-dispatcher-2] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1396905956000 ms.0 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take at DStream.scala:586(0) has different number of partitions than original RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:855) at org.apache.spark.SparkContext.runJob(SparkContext.scala:870) at org.apache.spark.SparkContext.runJob(SparkContext.scala:884) at org.apache.spark.rdd.RDD.take(RDD.scala:844) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Please let me know what other information would be helpful; I didn't find any question submission guidelines. Thanks, Paul
Re: CheckpointRDD has different number of partitions than original RDD
Few things that would be helpful. 1. Environment settings - you can find them on the environment tab in the Spark application UI 2. Are you setting the HDFS configuration correctly in your Spark program? For example, can you write a HDFS file from a Spark program (say spark-shell) to your HDFS installation and read it back into Spark (i.e., create a RDD)? You can test this by write an RDD as a text file from the shell, and then try to read it back from another shell. 3. If that works, then lets try explicitly checkpointing an RDD. To do this you can take any RDD and do the following. myRDD.checkpoint() myRDD.count() If there is some issue, then this should reproduce the above error. TD On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren pmog...@commercehub.com wrote: Hello, Spark community! My name is Paul. I am a Spark newbie, evaluating version 0.9.0 without any Hadoop at all, and need some help. I run into the following error with the StatefulNetworkWordCount example (and similarly in my prototype app, when I use the updateStateByKey operation). I get this when running against my small cluster, but not (so far) against local[2]. 61904 [spark-akka.actor.default-dispatcher-2] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1396905956000 ms.0 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take at DStream.scala:586(0) has different number of partitions than original RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:855) at org.apache.spark.SparkContext.runJob(SparkContext.scala:870) at org.apache.spark.SparkContext.runJob(SparkContext.scala:884) at org.apache.spark.rdd.RDD.take(RDD.scala:844) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Please let me know what other information would be helpful; I didn't find any question submission guidelines. Thanks, Paul
Re: Spark Streaming + Kafka + Mesos/Marathon strangeness
The cleaner ttl was introduced as a brute force method to clean all old data and metadata in the system, so that the system can run 24/7. The cleaner ttl should be set to a large value, so that RDDs older than that are not used. Though there are some cases where you may want to use an RDD again and again for an infinite duration, in which case no value of TTL is good enough. There are way around it - if you are going to use an RDD beyond the tll value, then you will have to recreate the RDD after some interval. However, the ideal solution is to actually is to identify the stuff (RDD, broadcasts, etc.) that are ready to be GCed and then clear their associated data. This is currently being implemented as a part of this PR. https://github.com/apache/spark/pull/126 This will make setting the cleaner TTL unnecessary and reduce errors related to cleaning up of needed RDDS. On Thu, Mar 27, 2014 at 3:47 PM, Evgeny Shishkin itparan...@gmail.comwrote: On 28 Mar 2014, at 01:44, Tathagata Das tathagata.das1...@gmail.com wrote: The more I think about it the problem is not about /tmp, its more about the workers not having enough memory. Blocks of received data could be falling out of memory before it is getting processed. BTW, what is the storage level that you are using for your input stream? If you are using MEMORY_ONLY, then try MEMORY_AND_DISK. That is safer because it ensure that if received data falls out of memory it will be at least saved to disk. TD And i saw such errors because of cleaner.rtt. Thich erases everything. Even needed rdds. On Thu, Mar 27, 2014 at 2:29 PM, Scott Clasen scott.cla...@gmail.comwrote: Heh sorry that wasnt a clear question, I know 'how' to set it but dont know what value to use in a mesos cluster, since the processes are running in lxc containers they wont be sharing a filesystem (or machine for that matter) I cant use an s3n:// url for local dir can I? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3373.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming and the spark shell
Seems like the configuration of the Spark worker is not right. Either the worker has not been given enough memory or the allocation of the memory to the RDD storage needs to be fixed. If configured correctly, the Spark workers should not get OOMs. On Thu, Mar 27, 2014 at 2:52 PM, Evgeny Shishkin itparan...@gmail.comwrote: 2. I notice that once I start ssc.start(), my stream starts processing and continues indefinitely...even if I close the socket on the server end (I'm using unix command nc to mimic a server as explained in the streaming programming guide .) Can I tell my stream to detect if it's lost a connection and therefore stop executing? (Or even better, to attempt to re-establish the connection?) Currently, not yet. But I am aware of this and this behavior will be improved in the future. Now i understand why out spark streaming job starts to generate zero sized rdds from kafkainput, when one worker get OOM or crashes. And we can't detect it! Great. So spark streaming just doesn't suite yet for 24/7 operation =\
Re: Spark Streaming - Shared hashmaps
When you say launch long-running tasks does it mean long running Spark jobs/tasks, or long-running tasks in another system? If the rate of requests from Kafka is not low (in terms of records per second), you could collect the records in the driver, and maintain the shared bag in the driver. A separate thread in the driver could pick stuff from the bag and launch tasks. This is a slightly unorthodox use of Spark Streaming, but should work. If the rate of request from Kafka is high, then I am not sure how you can sustain that many long running tasks (assuming 1 task corresponding to each request from Kafka). TD On Wed, Mar 26, 2014 at 1:19 AM, Bryan Bryan bryanbryan...@gmail.comwrote: Hi there, I have read about the two fundamental shared features in spark (broadcasting variables and accumulators), but this is what i need. I'm using spark streaming in order to get requests from Kafka, these requests may launch long-running tasks, and i need to control them: 1) Keep them in a shared bag, like a Hashmap, to retrieve them by ID, for example. 2) Retrieve an instance of this object/task whatever on-demand (on-request, in fact) Any idea about that? How can i share objects between slaves? May i use something out of spark (maybe hazelcast') Regards.
Re: streaming questions
*Answer 1:*Make sure you are using master as local[n] with n 1 (assuming you are running it in local mode). The way Spark Streaming works is that it assigns a code to the data receiver, and so if you run the program with only one core (i.e., with local or local[1]), then it wont have resources to process data along with receiving it. *Answer 2:*Spark Streaming is designed to replicate the received data within the machines in a Spark cluster for fault-tolerance. However, when you are running in the local mode, since there is only one machine, the blocks of data arent able to replicate. This is expected and safe to ignore in local mode. *Answer 3:*You can do something like wordCounts.foreachRDD((rdd: RDD[...], time: Time) = { if (rdd.take(1).size == 1) { // There exists at least one element in RDD, so save it to file rdd.saveAsTextFile(generate file name based on time) } } TD On Wed, Mar 26, 2014 at 11:08 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length 3) { System.err.println(Usage: StreamingWordCount master hostname port) System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, Streaming Word Count,Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line = line.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) Question 1: When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? Question 2: While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? Question 3: I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles(file:/my/path/outdir). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!
Re: rdd.saveAsTextFile problem
Can you give us the more detailed exception + stack trace in the log? It should be in the driver log. If not, please take a look at the executor logs, through the web ui to find the stack trace. TD On Tue, Mar 25, 2014 at 10:43 PM, gaganbm gagan.mis...@gmail.com wrote: Hi Folks, Is this issue resolved ? If yes, could you please throw some light on how to fix this ? I am facing the same problem during writing to text files. When I do stream.foreachRDD(rdd ={ rdd.saveAsTextFile(Some path) }) This works fine for me. But it creates multiple text files for each partition within an RDD. So I tried with coalesce option to merge my results in a single file for each RDD as : stream.foreachRDD(rdd ={ rdd.coalesce(1, true).saveAsTextFile(Some path) }) This fails with : org.apache.spark.SparkException: Job aborted: Task 75.0:0 failed 1 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) I am using Spark Streaming 0.9.0 Any clue what's going wrong when using coalesce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p3238.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark executor/driver log files management
The logs from the executor are redirected to stdout only because there is a default log4j.properties that is configured to do so. If you put your log4j.properties with rolling file appender in the classpath (refer to Spark docs for that), all the logs will get redirected to a separate files that will get rolled over. So even though there will be a stdout, nothing much would get printed in that. Though the side effect of this modification is that you wont be able to see the logs from the Spark web UI as that only shows stdout. On Mon, Mar 24, 2014 at 10:48 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi TD, I thought about that but was not sure whether this will have any impact in spark UI/ Executor runner as it redirects stream to stderr/stdout. But ideally it should not as it will fetch the log record from stderr file (which is latest).. Is my understanding correct? Thanks, Sourav On Tue, Mar 25, 2014 at 3:26 AM, Tathagata Das tathagata.das1...@gmail.com wrote: You can use RollingFileAppenders in log4j.properties. http://logging.apache.org/log4j/extras/apidocs/org/apache/log4j/rolling/RollingFileAppender.html You can have other scripts delete old logs. TD On Mon, Mar 24, 2014 at 12:20 AM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi, I have few questions regarding log file management in spark: 1. Currently I did not find any way to modify the lof file name for executor/drivers). Its hardcoded as stdout and stderr. Also there is no log rotation. In case of streaming application this will grow forever and become unmanageable. Is there any way to overcome this? Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Spark Streaming ZeroMQ Java Example
Unfortunately there isnt one right now. But it is probably too hard to start with the JavaNetworkWordCounthttps://github.com/apache/incubator-spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java, and use the ZeroMQUtils in the same way as the Scala ZeroMQWordCount example. Basically you have to change this linehttps://github.com/apache/incubator-spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java#L65 and create a zeromq stream rather than a socketStream. Refer to ZeroMQUtilshttp://spark.apache.org/docs/latest/api/external/zeromq/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$.createStream docs for more details. Would be great if you can contribute it back ;) TD On Mon, Mar 24, 2014 at 11:28 PM, goofy real goofyrealt...@gmail.comwrote: Is there a ZeroMQWordCount Java sample code? https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
Re: [bug?] streaming window unexpected behaviour
You can probably do it in a simpler but sort of hacky way! If your window size is W and sliding interval S, you can do some math to figure out how many of the first windows are actually partial windows. Its probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can increment a global counter to count how many RDDs have been generated and ignore the first few RDDs. windowDStream.foreachRDD(rdd = { Global.counter += 1 if (Global.counter math.ceil(W/S)) { return // ignore } else { // do something awesome } }) On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu amoc...@verticalscope.comwrote: Let me rephrase that, Do you think it is possible to use an accumulator to skip the first few incomplete RDDs? -Original Message- From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March-25-14 9:57 AM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: RE: [bug?] streaming window unexpected behaviour Thanks TD! Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are. -A -Original Message- From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: March-24-14 6:55 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: [bug?] streaming window unexpected behaviour Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration sliding interval). TD On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I have what I would call unexpected behaviour when using window on a stream. I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour? I have the following code val ssc = new StreamingContext(conf, Seconds(5)) val smallWindowStream = ssc.queueStream(smallWindowRddQueue) val bigWindowStream = ssc.queueStream(bigWindowRddQueue) val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) -Adrian
Re: Sliding Window operations do not work as documented
Hello Sanjay, Yes, your understanding of lazy semantics is correct. But ideally every batch should read based on the batch interval provided in the StreamingContext. Can you open a JIRA on this? On Mon, Mar 24, 2014 at 7:45 AM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi All, I found out why this problem exists. Consider the following scenario: - a DStream is created from any source. (I've checked with file and socket) - No actions are applied to this DStream - Sliding Window operation is applied to this DStream and an action is applied to the sliding window. In this case, Spark will not even read the input stream in the batch in which the sliding interval isn't a multiple of batch interval. Put another way, it won't read the input when it doesn't have to apply the window function. This is happening because all transformations in Spark are lazy. How to fix this or workaround it (see line#3): JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000)); JavaDStreamString inputStream = stcObj.textFileStream(/Input); inputStream.print(); // This is the workaround JavaDStreamString objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); objWindow.dstream().saveAsTextFiles(/Output, ); The Window operations example on the streaming guide implies that Spark will read the stream in every batch, which is not happening because of the lazy transformations. Wherever sliding window would be used, in most of the cases, no actions will be taken on the pre-window batch, hence my gut feeling was that Streaming would read every batch if any actions are being taken in the windowed stream. Regards, Sanjay On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi, I want to run a map/reduce process over last 5 seconds of data, every 4 seconds. This is quite similar to the sliding window pictorial example under Window Operations section on http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html . The RDDs returned by window transformation function are incorrect in my case. To investigate this further, I ran a series of examples with varying values of window length slide interval. Summary of the test results: (window length, slide interval) - result (3,1) - success (4,2) - success (3,2) - fail (4,3) - fail (5,4) - fail (5,2) - fail The only condition mentioned in the doc is that the two values(5 4) should be multiples of batch interval(1 in my case) and obviously, I get a run time error if I attempt to violate this condition. Looking at my results, it seems that failures result when the slide interval isn't a multiple of window length. My code: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000)); JavaDStreamString inputStream = stcObj.textFileStream(/Input); JavaDStreamString objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000)); objWindow.dstream().saveAsTextFiles(/Output, ); Detailed results: (3,1) - success @t_0: [inputStream's RDD@t_0] @t_1: [inputStream's RDD@t_0,1] @t_2: [inputStream's RDD@t_0,1,2] @t_3: [inputStream's RDD@t_1,2,3] @t_4: [inputStream's RDD@t_2,3,4] @t_5: [inputStream's RDD@t_3,4,5] (4,2) - success @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: [inputStream's RDD@t_2,3,4,5] (3,2) - fail @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_2,3]//(expected RDD@t_1,2,3) @t_4: nothing @t_5: [inputStream's RDD@t_4,5]//(expected RDD@t_3,4,5) (4,3) - fail @t_0: nothing @t_1: nothing @t_2: [inputStream's RDD@t_0,1,2] @t_3: nothing @t_4: nothing @t_5: [inputStream's RDD@t_3,4,5]//(expected RDD@t_2,3,4,5) (5,4) - fail @t_0: nothing @t_1: nothing @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: nothing @t_6: nothing @t_7: [inputStream's RDD@t_4,5,6,7]//(expected RDD@t_3,4,5,6,7) (5,2) - fail @t_0: nothing @t_1: [inputStream's RDD@t_0,1] @t_2: nothing @t_3: [inputStream's RDD@t_0,1,2,3] @t_4: nothing @t_5: [inputStream's RDD@t_2,3,4,5]//(expected RDD@t_1,2,3,4,5) @t_6: nothing @t_7: [inputStream's RDD@t_4,5,6,7]//(expected RDD@t_3,4,5,6,7) I have run all the above examples twice to be sure ! I believe either my understanding of sliding window mechanism is incorrect or there is a problem in the sliding window mechanism. Regards, Sanjay
Re: [bug?] streaming window unexpected behaviour
Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration sliding interval). TD On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I have what I would call unexpected behaviour when using window on a stream. I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow What I've noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s. Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour? I have the following code val ssc = new StreamingContext(conf, Seconds(5)) val smallWindowStream = ssc.queueStream(smallWindowRddQueue) val bigWindowStream = ssc.queueStream(bigWindowRddQueue) val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5)) .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3)) -Adrian
Re: Explain About Logs NetworkWordcount.scala
I am not sure how to debug this without any more information about the source. Can you monitor on the receiver side that data is being accepted by the receiver but not reported? TD On Wed, Mar 5, 2014 at 7:23 AM, eduardocalfaia e.costaalf...@unibs.itwrote: Hi TD, I have seen in the web UI the stage number that result has been zero and in the field GC Times there is nothing. http://apache-spark-user-list.1001560.n3.nabble.com/file/n2306/CaptureStage.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Explain-About-Logs-NetworkWordcount-scala-tp1835p2306.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchMethodError - Akka - Props
Are you launching your application using scala or java command? scala command bring in a version of Akka that we have found to cause conflicts with Spark's version for Akka. So its best to launch using Java. TD On Thu, Mar 6, 2014 at 3:45 PM, Deepak Nulu deepakn...@gmail.com wrote: I see the same error. I am trying a standalone example integrated into a Play Framework v2.2.2 application. The error occurs when I try to create a Spark Streaming Context. Compilation succeeds, so I am guessing it has to do with the version of Akka getting picked up at runtime. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-Akka-Props-tp2191p2375.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchMethodError in KafkaReciever
I dont have a Eclipse setup so I am not sure what is going on here. I would try to use maven in the command line with a pom to see if this compiles. Also, try to cleanup your system maven cache. Who knows if it had pulled in a wrong version of kafka 0.8 and using it all the time. Blowing away the cache and clean compiling will make sure the right kafka will be loaded. Hope this helps. TD On Sat, Mar 1, 2014 at 8:26 PM, venki-kratos ve...@thekratos.com wrote: I am trying to user code similar to following : public JavaPairDStreamString, String openStream() { HashMapString, String kafkaParams = Maps.newHashMap(); kafkaParams.put(ZK_CONNECT,kafkaConfig.getString(ZK_CONNECT)); kafkaParams.put(CONSUMER_GRP_ID,kafkaConfig.getString(CONSUMER_GRP_ID)); MapString,Integer topicMap = Maps.newHashMap(); topicMap.put(kafkaConfig.getString(ZK_TOPIC), kafkaConfig.getInteger(CONSUMER_THREAD_COUNT, 1)); JavaPairDStreamString, String inputStream = KafkaUtils.createStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); return inputStream; } I have spark-streaming_2.10-0.9.0-incubating.jar and spark-streaming-kafka_2.10-0.9.0-incubating.jar in the classpath using POM and m2e in Eclipse. JVM version is set to 1.6 I get the following error, 14/03/02 09:29:15 INFO kafka.KafkaReceiver: Connected to localhost:2181 14/03/02 09:29:15 ERROR kafka.KafkaReceiver: Error receiving data java.lang.NoSuchMethodException: java.lang.Object.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2763) at java.lang.Class.getConstructor(Class.java:1693) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at . This is similar to code in JavaKafkaStreamSuite.testKafkaStream. I find that the kafka jar - kafka_2.10-0.8.0 does have such a constructor. What is going wrong? Can someone help solve this mystery and help with my misery? Basically stuck for last 2 days - as I am a Java Guy and would like to develop downstream code in Java -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming on ec2
Yes! Spark streaming programs are just like any spark program and so any ec2 cluster setup using the spark-ec2 scripts can be used to run spark streaming programs as well. On Thu, Feb 27, 2014 at 10:11 AM, Aureliano Buendia buendia...@gmail.comwrote: Hi, Does the ec2 support for spark 0.9 also include spark streaming? If not, is there an equivalent?
Re: Spark streaming on ec2
Yes, the default spark EC2 cluster runs the standalone deploy mode. Since Spark 0.9, the standalone deploy mode allows you to launch the driver app within the cluster itself and automatically restart it if it fails. You can read about launching your app inside the cluster herehttp://spark.incubator.apache.org/docs/latest/spark-standalone.html#connecting-an-application-to-the-cluster. Using this you can launch your streaming app as well. TD On Thu, Feb 27, 2014 at 5:35 PM, Aureliano Buendia buendia...@gmail.comwrote: How about spark stream app itself? Does the ec2 script also provide means for daemonizing and monitoring spark streaming apps which are supposed to run 24/7? If not, any suggestions for how to do this? On Thu, Feb 27, 2014 at 8:23 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Zookeeper is automatically set up in the cluster as Spark uses Zookeeper. However, you have to setup your own input source like Kafka or Flume. TD On Thu, Feb 27, 2014 at 10:32 AM, Aureliano Buendia buendia...@gmail.com wrote: On Thu, Feb 27, 2014 at 6:17 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes! Spark streaming programs are just like any spark program and so any ec2 cluster setup using the spark-ec2 scripts can be used to run spark streaming programs as well. Great. Does it come with any input source support as well? (Eg kafka requires setting up zookeeper). On Thu, Feb 27, 2014 at 10:11 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, Does the ec2 support for spark 0.9 also include spark streaming? If not, is there an equivalent?