Re: custom receiver in java

2014-06-04 Thread Tathagata Das
Yes, thanks updating this old thread! We heard our community demands and
added support for Java receivers!

TD


On Wed, Jun 4, 2014 at 12:15 PM, lbustelo g...@bustelos.com wrote:

 Not that what TD was referring above, is already in 1.0.0

 http://spark.apache.org/docs/1.0.0/streaming-custom-receivers.html



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/custom-receiver-in-java-tp3575p6962.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Failed to remove RDD error

2014-06-03 Thread Tathagata Das
It was not intended to be experimental as this improves general
performance. We tested the feature since 0.9, and didnt see any problems.
We need to investigate the cause of this. Can you give us the logs showing
this error so that we can analyze this.

TD


On Tue, Jun 3, 2014 at 10:08 AM, Michael Chang m...@tellapart.com wrote:

 Thanks Tathagata,

 Thanks for all your hard work!  In the future, is it possible to mark
 experimental features as such on the online documentation?

 Thanks,
 Michael


 On Mon, Jun 2, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com
  wrote:

 Spark.streaming.unpersist was an experimental feature introduced with
 Spark 0.9 (but kept disabled), which actively clears off RDDs that are not
 useful any more. in Spark 1.0 that has been enabled by default. It is
 possible that this is an unintended side-effect of that. If
 spark.cleaner.ttl works then that should be used.

 TD


 On Mon, Jun 2, 2014 at 9:42 AM, Michael Chang m...@tellapart.com wrote:

 Hey Mayur,

 Thanks for the suggestion, I didn't realize that was configurable.  I
 don't think I'm running out of memory, though it does seem like these
 errors go away when i turn off the spark.streaming.unpersist configuration
 and use spark.cleaner.ttl instead.  Do you know if there are known issues
 with the unpersist option?


 On Sat, May 31, 2014 at 12:17 AM, Mayur Rustagi mayur.rust...@gmail.com
  wrote:

 You can increase your akka timeout, should give you some more life..
 are you running out of memory by any chance?


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 6:52 AM, Michael Chang m...@tellapart.com
 wrote:

 I'm running a some kafka streaming spark contexts (on 0.9.1), and they
 seem to be dying after 10 or so minutes with a lot of these errors.  I
 can't really tell what's going on here, except that maybe the driver is
 unresponsive somehow?  Has anyone seen this before?

 14/05/31 01:13:30 ERROR BlockManagerMaster: Failed to remove RDD 12635

 akka.pattern.AskTimeoutException: Timed out

 at
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)

 at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118)

 at
 scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:691)

 at
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:688)

 at
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455)

 at
 akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407)

 at
 akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411)

 at
 akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363)

 at java.lang.Thread.run(Thread.java:744)

 Thanks,

 Mike









Re: NoSuchElementException: key not found

2014-06-03 Thread Tathagata Das
I think I know what is going on! This probably a race condition in the
DAGScheduler. I have added a JIRA for this. The fix is not trivial though.

https://issues.apache.org/jira/browse/SPARK-2002

A not-so-good workaround for now would be not use coalesced RDD, which is
avoids the race condition.

TD


On Tue, Jun 3, 2014 at 10:09 AM, Michael Chang m...@tellapart.com wrote:

 I only had the warning level logs, unfortunately.  There were no other
 references of 32855 (except a repeated stack trace, I believe).  I'm using
 Spark 0.9.1


 On Mon, Jun 2, 2014 at 5:50 PM, Tathagata Das tathagata.das1...@gmail.com
  wrote:

 Do you have the info level logs of the application? Can you grep the
 value 32855 to find any references to it? Also what version of the
 Spark are you using (so that I can match the stack trace, does not seem to
 match with Spark 1.0)?

 TD


 On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang m...@tellapart.com wrote:

 Hi all,

 Seeing a random exception kill my spark streaming job. Here's a stack
 trace:

 java.util.NoSuchElementException: key not found: 32855
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211)
  at
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072)
 at
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716)
 at
 org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator.init(CoalescedRDD.scala:183)
 at
 org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234)
 at
 org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333)
 at
 org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.RDD.take(RDD.scala:830)
 at
 org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337)
 at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27)
 at
 com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87)
 at
 com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520

Re: NoSuchElementException: key not found

2014-06-03 Thread Tathagata Das
I am not sure what DStream operations you are using, but some operation is
internally creating CoalescedRDDs. That is causing the race condition. I
might be able help if you can tell me what DStream operations you are using.

TD


On Tue, Jun 3, 2014 at 4:54 PM, Michael Chang m...@tellapart.com wrote:

 Hi Tathagata,

 Thanks for your help!  By not using coalesced RDD, do you mean not
 repartitioning my Dstream?

 Thanks,
 Mike




 On Tue, Jun 3, 2014 at 12:03 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 I think I know what is going on! This probably a race condition in the
 DAGScheduler. I have added a JIRA for this. The fix is not trivial though.

 https://issues.apache.org/jira/browse/SPARK-2002

 A not-so-good workaround for now would be not use coalesced RDD, which
 is avoids the race condition.

 TD


 On Tue, Jun 3, 2014 at 10:09 AM, Michael Chang m...@tellapart.com
 wrote:

 I only had the warning level logs, unfortunately.  There were no other
 references of 32855 (except a repeated stack trace, I believe).  I'm using
 Spark 0.9.1


 On Mon, Jun 2, 2014 at 5:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Do you have the info level logs of the application? Can you grep the
 value 32855 to find any references to it? Also what version of the
 Spark are you using (so that I can match the stack trace, does not seem to
 match with Spark 1.0)?

 TD


 On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang m...@tellapart.com
 wrote:

 Hi all,

 Seeing a random exception kill my spark streaming job. Here's a stack
 trace:

 java.util.NoSuchElementException: key not found: 32855
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211)
  at
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072)
 at
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716)
 at
 org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188)
 at
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
 at
 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator.init(CoalescedRDD.scala:183)
 at
 org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234)
 at
 org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333)
 at
 org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.RDD.take(RDD.scala:830)
 at
 org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337)
 at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27

Re: Interactive modification of DStreams

2014-06-02 Thread Tathagata Das
Currently Spark Streaming does not support addition/deletion/modification
of DStream after the streaming context has been started.
Nor can you restart a stopped streaming context.
Also, multiple spark contexts (and therefore multiple streaming contexts)
cannot be run concurrently in the same JVM.

To change the window duration, I would one of the following.

1. Stop the previous streaming context, create a new streaming context, and
setup the dstreams once again with the new window duration
2. Create a custom DStream, say DynamicWindowDStream. Take a look at how
WindowedDStream
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
is implemented (pretty simple, just a union over RDDs across time). That
should allow you to modify the window duration. However, do make sure you
have a maximum window duration that you will ever reach, and make sure you
define parentRememberDuration
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala#L53
as
a rememberDuration + maxWindowDuration. That fields defines which RDDs
can be forgotten, so is sensitive to the window duration. Then you have to
take care of correctly (atomically, etc.) modifying the window duration as
per your requirements.

Happy streaming!

TD




On Mon, Jun 2, 2014 at 2:46 PM, lbustelo g...@bustelos.com wrote:

 This is a general question about whether Spark Streaming can be interactive
 like batch Spark jobs. I've read plenty of threads and done my fair bit of
 experimentation and I'm thinking the answer is NO, but it does not hurt to
 ask.

 More specifically, I would like to be able to do:
 1. Add/Remove steps to the Streaming Job
 2. Modify Window durations
 3. Stop and Restart context.

 I've tried the following:

 1. Modify the DStream after it has been started… BOOM! Exceptions
 everywhere.

 2. Stop the DStream, Make modification, Start… NOT GOOD :( In 0.9.0 I was
 getting deadlocks. I also tried 1.0.0 and it did not work.

 3. Based on information provided  here
 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3371.html
 
 , I was been able to prototype modifying the RDD computation within a
 forEachRDD. That is nice, but you are then bounded to the specified batch
 size. That got me to wanting to modify Window durations. Is changing the
 Window duration possible?

 4. Tried running multiple streaming context from within a single Driver
 application and got several exceptions. The first one was bind exception on
 the web port. Then once the app started getting run (cores were taken but
 1st job) it did not run correctly. A lot of
 akka.pattern.AskTimeoutException: Timed out
 .

 I've tried my experiments in 0.9.0, 0.9.1 and 1.0.0 running on Standalone
 Cluster setup.
 Thanks in advanced



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Interactive-modification-of-DStreams-tp6740.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Window slide duration

2014-06-02 Thread Tathagata Das
I am assuming that you are referring to the OneForOneStrategy: key not
found: 1401753992000 ms error, and not to the previous Time 1401753992000
ms is invalid  Those two seem a little unrelated to me. Can you give
us the stacktrace associated with the key-not-found error?

TD


On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan kot.bege...@gmail.com wrote:

 Hi all,

 I am getting an error:
 
 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
 as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
 is 6000 ms
 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms
 

 My relevant code is:
 ===
 ssc =  new StreamingContext(conf, Seconds(1))
 val messageEvents = events.
   flatMap(e = evaluatorCached.value.find(e)).
   window(Seconds(8), Seconds(4))
 messageEvents.print()
 ===

 Seems all right to me, window slide duration (4) is streaming context
 batch duration (1) *2. So, what's the problem?

 Spark-v1.0.0

 --
 From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
 explicitly specified



Re: NoSuchElementException: key not found

2014-06-02 Thread Tathagata Das
Do you have the info level logs of the application? Can you grep the
value 32855
to find any references to it? Also what version of the Spark are you using
(so that I can match the stack trace, does not seem to match with Spark
1.0)?

TD


On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang m...@tellapart.com wrote:

 Hi all,

 Seeing a random exception kill my spark streaming job. Here's a stack
 trace:

 java.util.NoSuchElementException: key not found: 32855
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211)
  at
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072)
 at
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716)
 at
 org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
 at
 org.apache.spark.rdd.PartitionCoalescer$LocationIterator.init(CoalescedRDD.scala:183)
 at
 org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234)
 at
 org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333)
 at
 org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.RDD.take(RDD.scala:830)
 at
 org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337)
 at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27)
 at
 com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87)
 at
 com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
 

Re: Window slide duration

2014-06-02 Thread Tathagata Das
Can you give all the logs? Would like to see what is clearing the key
 1401754908000
ms

TD


On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan kot.bege...@gmail.com wrote:

 Ok, it seems like Time ... is invalid is part of normal workflow, when
 window DStream will ignore RDDs at moments in time when they do not match
 to the window sliding interval. But why am I getting exception is still
 unclear. Here is the full stack:

 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid
 as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference
 is 1000 ms
 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 ms
 java.util.NoSuchElementException: key not found: 1401754908000 ms
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
 at org.apache.spark.streaming.scheduler.JobGenerator.org
 $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
 at
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan kot.bege...@gmail.com
 wrote:

 Hi all,

 I am getting an error:
 
 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
 as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
 is 6000 ms
 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms
 

 My relevant code is:
 ===
 ssc =  new StreamingContext(conf, Seconds(1))
 val messageEvents = events.
   flatMap(e = evaluatorCached.value.find(e)).
   window(Seconds(8), Seconds(4))
 messageEvents.print()
 ===

 Seems all right to me, window slide duration (4) is streaming context
 batch duration (1) *2. So, what's the problem?

 Spark-v1.0.0

 --
 From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
 explicitly specified




 --
 From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
 explicitly specified



Re: Unable to run a Standalone job

2014-05-22 Thread Tathagata Das
How are you launching the application? sbt run ? spark-submit? local
mode or Spark standalone cluster? Are you packaging all your code into
a jar?
Looks to me that you seem to have spark classes in your execution
environment but missing some of Spark's dependencies.

TD



On Thu, May 22, 2014 at 2:27 PM, Shrikar archak shrika...@gmail.com wrote:
 Hi All,

 I am trying to run the network count example as a seperate standalone job
 and running into some issues.

 Environment:
 1) Mac Mavericks
 2) Latest spark repo from Github.


 I have a structure like this

 Shrikars-MacBook-Pro:SimpleJob shrikar$ find .
 .
 ./simple.sbt
 ./src
 ./src/main
 ./src/main/scala
 ./src/main/scala/NetworkWordCount.scala
 ./src/main/scala/SimpleApp.scala.bk


 simple.sbt
 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 libraryDependencies ++= Seq(org.apache.spark %% spark-core %
 1.0.0-SNAPSHOT,
 org.apache.spark %% spark-streaming %
 1.0.0-SNAPSHOT)

 resolvers += Akka Repository at http://repo.akka.io/releases/;


 I am able to run the SimpleApp which is mentioned in the doc but when I try
 to run the NetworkWordCount app I get error like this am I missing
 something?

 [info] Running com.shrikar.sparkapps.NetworkWordCount
 14/05/22 14:26:47 INFO spark.SecurityManager: Changing view acls to: shrikar
 14/05/22 14:26:47 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(shrikar)
 14/05/22 14:26:48 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/05/22 14:26:48 INFO Remoting: Starting remoting
 14/05/22 14:26:48 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@192.168.10.88:49963]
 14/05/22 14:26:48 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@192.168.10.88:49963]
 14/05/22 14:26:48 INFO spark.SparkEnv: Registering MapOutputTracker
 14/05/22 14:26:48 INFO spark.SparkEnv: Registering BlockManagerMaster
 14/05/22 14:26:48 INFO storage.DiskBlockManager: Created local directory at
 /var/folders/r2/mbj08pb55n5d_9p8588xk5b0gn/T/spark-local-20140522142648-0a14
 14/05/22 14:26:48 INFO storage.MemoryStore: MemoryStore started with
 capacity 911.6 MB.
 14/05/22 14:26:48 INFO network.ConnectionManager: Bound socket to port 49964
 with id = ConnectionManagerId(192.168.10.88,49964)
 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 14/05/22 14:26:48 INFO storage.BlockManagerInfo: Registering block manager
 192.168.10.88:49964 with 911.6 MB RAM
 14/05/22 14:26:48 INFO storage.BlockManagerMaster: Registered BlockManager
 14/05/22 14:26:48 INFO spark.HttpServer: Starting HTTP Server
 [error] (run-main) java.lang.NoClassDefFoundError:
 javax/servlet/http/HttpServletResponse
 java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
 at org.apache.spark.HttpServer.start(HttpServer.scala:54)
 at
 org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
 at
 org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
 at
 org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
 at
 org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
 at
 org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
 at org.apache.spark.SparkContext.init(SparkContext.scala:202)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:549)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:91)
 at com.shrikar.sparkapps.NetworkWordCount$.main(NetworkWordCount.scala:39)
 at com.shrikar.sparkapps.NetworkWordCount.main(NetworkWordCount.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)


 Thanks,
 Shrikar



Re: How to turn off MetadataCleaner?

2014-05-22 Thread Tathagata Das
The cleaner should remain up while the sparkcontext is still active (not
stopped). However, here it seems you are stopping the sparkContext
(ssc.stop(true)), the cleaner should be stopped. However, there was a bug
earlier where some of the cleaners may not have been stopped when the
context is stopped. What version are you using. If it is 0.9.1, I can see
that the cleaner in
ShuffleBlockManagerhttps://github.com/apache/spark/blob/v0.9.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scalais
not stopped, so it is a bug.

TD


On Thu, May 22, 2014 at 9:24 AM, Adrian Mocanu amoc...@verticalscope.comwrote:

  Hi

 After using sparks TestSuiteBase to run some tests I’ve noticed that at
 the end, after finishing all tests the cleaner is still running and outputs
 the following perdiodically:

 INFO  o.apache.spark.util.MetadataCleaner  - Ran metadata cleaner for
 SHUFFLE_BLOCK_MANAGER



 I use method testOperation and I’ve changed it so that it stores the
 pointer to ssc after running setupStreams. Then using that pointer to turn
 things off, but the cleaner remains up.



 How to shut down all of spark, including cleaner?



 Here is how I changed testOperation method (changes in bold):



   def testOperation[U: ClassTag, V: ClassTag](

input: Seq[Seq[U]],

operation: DStream[U] =
 DStream[V],

expectedOutput: Seq[Seq[V]],

numBatches: Int,

useSet: Boolean

) {

 val numBatches_ = if (numBatches  0) numBatches else
 expectedOutput.size

 *val ssc* = setupStreams[U, V](input, operation)

 val output = runStreams[V](ssc, numBatches_, expectedOutput.size)

 verifyOutput[V](output, expectedOutput, useSet)

 *ssc.awaitTermination(500)*

 *ssc.stop(true)*

   }



 -Adrian





Re: Unable to run a Standalone job

2014-05-22 Thread Tathagata Das
How are you getting Spark with 1.0.0-SNAPSHOT through maven? Did you
publish Spark locally which allowed you to use it as a dependency?

This is a weird indeed. SBT should take care of all the dependencies of
spark.

In any case, you can try the last released Spark 0.9.1 and see if the
problem persists.


On Thu, May 22, 2014 at 3:59 PM, Shrikar archak shrika...@gmail.com wrote:

 I am running as sbt run. I am running it locally .

 Thanks,
 Shrikar


 On Thu, May 22, 2014 at 3:53 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 How are you launching the application? sbt run ? spark-submit? local
 mode or Spark standalone cluster? Are you packaging all your code into
 a jar?
 Looks to me that you seem to have spark classes in your execution
 environment but missing some of Spark's dependencies.

 TD



 On Thu, May 22, 2014 at 2:27 PM, Shrikar archak shrika...@gmail.com
 wrote:
  Hi All,
 
  I am trying to run the network count example as a seperate standalone
 job
  and running into some issues.
 
  Environment:
  1) Mac Mavericks
  2) Latest spark repo from Github.
 
 
  I have a structure like this
 
  Shrikars-MacBook-Pro:SimpleJob shrikar$ find .
  .
  ./simple.sbt
  ./src
  ./src/main
  ./src/main/scala
  ./src/main/scala/NetworkWordCount.scala
  ./src/main/scala/SimpleApp.scala.bk
 
 
  simple.sbt
  name := Simple Project
 
  version := 1.0
 
  scalaVersion := 2.10.3
 
  libraryDependencies ++= Seq(org.apache.spark %% spark-core %
  1.0.0-SNAPSHOT,
  org.apache.spark %% spark-streaming %
  1.0.0-SNAPSHOT)
 
  resolvers += Akka Repository at http://repo.akka.io/releases/;
 
 
  I am able to run the SimpleApp which is mentioned in the doc but when I
 try
  to run the NetworkWordCount app I get error like this am I missing
  something?
 
  [info] Running com.shrikar.sparkapps.NetworkWordCount
  14/05/22 14:26:47 INFO spark.SecurityManager: Changing view acls to:
 shrikar
  14/05/22 14:26:47 INFO spark.SecurityManager: SecurityManager:
  authentication disabled; ui acls disabled; users with view permissions:
  Set(shrikar)
  14/05/22 14:26:48 INFO slf4j.Slf4jLogger: Slf4jLogger started
  14/05/22 14:26:48 INFO Remoting: Starting remoting
  14/05/22 14:26:48 INFO Remoting: Remoting started; listening on
 addresses
  :[akka.tcp://spark@192.168.10.88:49963]
  14/05/22 14:26:48 INFO Remoting: Remoting now listens on addresses:
  [akka.tcp://spark@192.168.10.88:49963]
  14/05/22 14:26:48 INFO spark.SparkEnv: Registering MapOutputTracker
  14/05/22 14:26:48 INFO spark.SparkEnv: Registering BlockManagerMaster
  14/05/22 14:26:48 INFO storage.DiskBlockManager: Created local
 directory at
 
 /var/folders/r2/mbj08pb55n5d_9p8588xk5b0gn/T/spark-local-20140522142648-0a14
  14/05/22 14:26:48 INFO storage.MemoryStore: MemoryStore started with
  capacity 911.6 MB.
  14/05/22 14:26:48 INFO network.ConnectionManager: Bound socket to port
 49964
  with id = ConnectionManagerId(192.168.10.88,49964)
  14/05/22 14:26:48 INFO storage.BlockManagerMaster: Trying to register
  BlockManager
  14/05/22 14:26:48 INFO storage.BlockManagerInfo: Registering block
 manager
  192.168.10.88:49964 with 911.6 MB RAM
  14/05/22 14:26:48 INFO storage.BlockManagerMaster: Registered
 BlockManager
  14/05/22 14:26:48 INFO spark.HttpServer: Starting HTTP Server
  [error] (run-main) java.lang.NoClassDefFoundError:
  javax/servlet/http/HttpServletResponse
  java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
  at org.apache.spark.HttpServer.start(HttpServer.scala:54)
  at
 
 org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
  at
 
 org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
  at
 
 org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
  at
 
 org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
  at
 
 org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
  at org.apache.spark.SparkContext.init(SparkContext.scala:202)
  at
 
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:549)
  at
 
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:561)
  at
 
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:91)
  at
 com.shrikar.sparkapps.NetworkWordCount$.main(NetworkWordCount.scala:39)
  at com.shrikar.sparkapps.NetworkWordCount.main(NetworkWordCount.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  at java.lang.reflect.Method.invoke(Method.java:597)
 
 
  Thanks,
  Shrikar
 





Re: any way to control memory usage when streaming input's speed is faster than the speed of handled by spark streaming ?

2014-05-21 Thread Tathagata Das
Unfortunately, there is no API support for this right now. You could
implement it yourself by implementing your own receiver and controlling the
rate at which objects are received. If you are using any of the standard
receivers (Flume, Kafka, etc.), I recommended looking at the source code of
the corresponding receiver and making your own version of

Alternatively, there is a open
JIRAhttps://issues.apache.org/jira/browse/SPARK-1341about this
implementing this functionality. You could give it a shot at
implementing this in a generic that it can be used for all receivers ;)




On Tue, May 20, 2014 at 8:31 PM, Francis.Hu francis...@reachjunction.comwrote:

  sparkers,



 Is there a better way to control memory usage when streaming input's speed
 is faster than the speed of handled by spark streaming ?



 Thanks,

 Francis.Hu



Re: any way to control memory usage when streaming input's speed is faster than the speed of handled by spark streaming ?

2014-05-21 Thread Tathagata Das
Apologies for the premature send.

Unfortunately, there is no API support for this right now. You could
implement it yourself by implementing your own receiver and controlling the
rate at which objects are received. If you are using any of the standard
receivers (Flume, Kafka, etc.), I recommended looking at the source code of
the corresponding receiver and making your own version of Flume receiver /
Kafka receiver.

Alternatively, there is a open
JIRAhttps://issues.apache.org/jira/browse/SPARK-1341 about
this implementing this functionality. You could give it a shot at
implementing this in a generic that it can be used for all receivers ;)

TD

On Wed, May 21, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Unfortunately, there is no API support for this right now. You could
 implement it yourself by implementing your own receiver and controlling the
 rate at which objects are received. If you are using any of the standard
 receivers (Flume, Kafka, etc.), I recommended looking at the source code of
 the corresponding receiver and making your own version of

 Alternatively, there is a open 
 JIRAhttps://issues.apache.org/jira/browse/SPARK-1341about this implementing 
 this functionality. You could give it a shot at
 implementing this in a generic that it can be used for all receivers ;)




 On Tue, May 20, 2014 at 8:31 PM, Francis.Hu 
 francis...@reachjunction.comwrote:

  sparkers,



 Is there a better way to control memory usage when streaming input's
 speed is faster than the speed of handled by spark streaming ?



 Thanks,

 Francis.Hu





Re: tests that run locally fail when run through bamboo

2014-05-21 Thread Tathagata Das
This do happens sometimes, but it is a warning because Spark is designed
try successive ports until it succeeds. So unless a cray number of
successive ports are blocked (runaway processes?? insufficient clearing of
ports by OS??), these errors should not be a problem for tests passing.


On Wed, May 21, 2014 at 2:31 PM, Adrian Mocanu amoc...@verticalscope.comwrote:

  Just found this at the top of the log:



 17:14:41.124 [pool-7-thread-3-ScalaTest-running-StreamingSpikeSpec] WARN
 o.e.j.u.component.AbstractLifeCycle - FAILED
 SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address
 already in use

 build   21-May-2014 17:14:41   java.net.BindException: Address already in
 use



 Is there a way to set these connection up so that they don’t all start on
 the same port (that’s my guess for the root cause of the issue)



 *From:* Adrian Mocanu [mailto:amoc...@verticalscope.com]
 *Sent:* May-21-14 4:58 PM
 *To:* u...@spark.incubator.apache.org; user@spark.apache.org
 *Subject:* tests that run locally fail when run through bamboo



 I have a few test cases for Spark which extend TestSuiteBase from
 org.apache.spark.streaming.

 The tests run fine on my machine but when I commit to repo and run the
 tests automatically with bamboo the test cases fail with these errors.



 How to fix?





 21-May-2014 16:33:09

 [info] StreamingZigZagSpec:

 21-May-2014 16:33:09

 [info] - compute zigzag indicator in stream *** FAILED ***

 21-May-2014 16:33:09

 [info]   org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1
 times (most recent failure: Exception failure:
 java.io.StreamCorruptedException: invalid type code: AC)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

 21-May-2014 16:33:09

 [info]   at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 21-May-2014 16:33:09

 [info]   at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 21-May-2014 16:33:09

 [info]   at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

 21-May-2014 16:33:09

 [info]   at scala.Option.foreach(Option.scala:236)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

 21-May-2014 16:33:09

 [info]   ...

 21-May-2014 16:33:09

 [info] - compute zigzag indicator in stream with intermittent empty RDDs
 *** FAILED ***

 21-May-2014 16:33:09

 [info]   Operation timed out after 10042 ms (TestSuiteBase.scala:283)

 21-May-2014 16:33:09

 [info] - compute zigzag indicator in stream with 3 empty RDDs *** FAILED
 ***

 21-May-2014 16:33:09

 [info]   org.apache.spark.SparkException: Job aborted: Task 1.0:1 failed 1
 times (most recent failure: Exception failure:
 java.io.FileNotFoundException:
 /tmp/spark-local-20140521163241-1707/0f/shuffle_1_1_1 (No such file or
 directory))

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)

 21-May-2014 16:33:09

 [info]   at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 21-May-2014 16:33:09

 [info]   at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 21-May-2014 16:33:09

 [info]   at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

 21-May-2014 16:33:09

 [info]   at scala.Option.foreach(Option.scala:236)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)

 21-May-2014 16:33:09

 [info]   at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

 21-May-2014 16:33:09

 [info]   ...

 

Re: I want to filter a stream by a subclass.

2014-05-21 Thread Tathagata Das
You could do

records.filter { _.isInstanceOf[Orange] } .map { _.asInstanceOf[Orange] }



On Wed, May 21, 2014 at 3:28 PM, Ian Holsman i...@holsman.com.au wrote:

 Hi.
 Firstly I'm a newb (to both Scala  Spark).

 I have a stream, that contains multiple types of records, and I would like
 to create multiple streams based on that

 currently I have it set up as

 class ALL
 class Orange extends ALL
 class Apple extends ALL

 now I can easily add a filter ala
 val records:DStream[ALL] = ...mapper to build the classes off the wire...
 val orangeRecords = records.filter {_.isInstanceOf[Orange]}

 but I would like to have the line be a DStream[Orange] instead of a
 DStream[ALL]
 (So I can access the unique fields in the subclass).

 I'm using 0.9.1 if it matters.

 TIA
 Ian

 --
 Ian Holsman
 i...@holsman.com.au
 PH: + 61-3-9028 8133 / +1-(425) 998-7083



Re: Failed RC-10 yarn-cluster job for FS closed error when cleaning up staging directory

2014-05-21 Thread Tathagata Das
Are you running a vanilla Hadoop 2.3.0 or the one that comes with CDH5 /
HDP(?) ? We may be able to reproduce this in that case.

TD

On Wed, May 21, 2014 at 8:35 PM, Tom Graves tgraves...@yahoo.com wrote:

 It sounds like something is closing the hdfs filesystem before everyone is
 really done with it. The filesystem gets cached and is shared so if someone
 closes it while other threads are still using it you run into this error.   Is
 your application closing the filesystem? Are you using the event
 logging feature?   Could you share the options you are running with?

 Yarn will retry the application depending on how the Application Master
 attempt fails (this is a configurable setting as to how many times it
 retries).  That is probably the second driver you are referring to.  But
 they shouldn't have overlapped as far as both being up at the same time. Is
 that the case you are seeing?  Generally you want to look at why the first
 application attempt fails.

 Tom



   On Wednesday, May 21, 2014 6:10 PM, Kevin Markey 
 kevin.mar...@oracle.com wrote:


  I tested an application on RC-10 and Hadoop 2.3.0 in yarn-cluster mode
 that had run successfully with Spark-0.9.1 and Hadoop 2.3 or 2.2.  The
 application successfully ran to conclusion but it ultimately failed.

 There were 2 anomalies...

 1. ASM reported only that the application was ACCEPTED.  It never
 indicated that the application was RUNNING.

 14/05/21 16:06:12 INFO yarn.Client: Application report from ASM:
  application identifier: application_1400696988985_0007
  appId: 7
  clientToAMToken: null
  appDiagnostics:
  appMasterHost: N/A
  appQueue: default
  appMasterRpcPort: -1
  appStartTime: 1400709970857
  yarnAppState: ACCEPTED
  distributedFinalState: UNDEFINED
  appTrackingUrl:
 http://Sleepycat:8088/proxy/application_1400696988985_0007/http://sleepycat:8088/proxy/application_1400696988985_0007/
  appUser: hduser

 Furthermore, it *started a second container*, running two partly
 *overlapping* drivers, when it appeared that the application never
 started.  Each container ran to conclusion as explained above, taking twice
 as long as usual for both to complete.  Both instances had the same
 concluding failure.

 2. Each instance failed as indicated by the stderr log, finding that the 
 *filesystem
 was closed* when trying to clean up the staging directories.

 14/05/21 16:08:24 INFO Executor: Serialized size of result for 1453 is 863
 14/05/21 16:08:24 INFO Executor: Sending result for 1453 directly to driver
 14/05/21 16:08:24 INFO Executor: Finished task ID 1453
 14/05/21 16:08:24 INFO TaskSetManager: Finished TID 1453 in 202 ms on
 localhost (progress: 2/2)
 14/05/21 16:08:24 INFO DAGScheduler: Completed ResultTask(1507, 1)
 14/05/21 16:08:24 INFO TaskSchedulerImpl: Removed TaskSet 1507.0, whose
 tasks have all completed, from pool
 14/05/21 16:08:24 INFO DAGScheduler: Stage 1507 (count at KEval.scala:32)
 finished in 0.417 s
 14/05/21 16:08:24 INFO SparkContext: Job finished: count at
 KEval.scala:32, took 1.532789283 s
 14/05/21 16:08:24 INFO SparkUI: Stopped Spark web UI at
 http://dhcp-brm-bl1-215-1e-east-10-135-123-92.usdhcp.oraclecorp.com:42250
 14/05/21 16:08:24 INFO DAGScheduler: Stopping DAGScheduler
 14/05/21 16:08:25 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
 stopped!
 14/05/21 16:08:25 INFO ConnectionManager: Selector thread was interrupted!
 14/05/21 16:08:25 INFO ConnectionManager: ConnectionManager stopped
 14/05/21 16:08:25 INFO MemoryStore: MemoryStore cleared
 14/05/21 16:08:25 INFO BlockManager: BlockManager stopped
 14/05/21 16:08:25 INFO BlockManagerMasterActor: Stopping BlockManagerMaster
 14/05/21 16:08:25 INFO BlockManagerMaster: BlockManagerMaster stopped
 14/05/21 16:08:25 INFO SparkContext: Successfully stopped SparkContext
 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
 down remote daemon.
 14/05/21 16:08:25 INFO ApplicationMaster: *finishApplicationMaster with
 SUCCEEDED*
 14/05/21 16:08:25 INFO ApplicationMaster: AppMaster received a signal.
 14/05/21 16:08:25 INFO ApplicationMaster: Deleting staging directory
 .sparkStaging/application_1400696988985_0007
 14/05/21 16:08:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote
 daemon shut down; proceeding with flushing remote transports.
 14/05/21 16:08:25 ERROR *ApplicationMaster: Failed to cleanup staging dir
 .sparkStaging/application_1400696988985_0007*
 *java.io.IOException: Filesystem closed*
 at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
 at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1685)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:591)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:587)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 

Re: question about the license of akka and Spark

2014-05-20 Thread Tathagata Das
Akka is under Apache 2 license too.
http://doc.akka.io/docs/akka/snapshot/project/licenses.html


On Tue, May 20, 2014 at 2:16 AM, YouPeng Yang yypvsxf19870...@gmail.comwrote:

 Hi
   Just know akka is under a commercial license,however Spark is under the
 apache
 license.
   Is there any problem?


 Regards



Re: life if an executor

2014-05-19 Thread Tathagata Das
That's one the main motivation in using Tachyon ;)
http://tachyon-project.org/

It gives off heap in-memory caching. And starting Spark 0.9, you can cache
any RDD in Tachyon just by specifying the appropriate StorageLevel.

TD




On Mon, May 19, 2014 at 10:22 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 I guess it needs to be this way to benefit from caching of RDDs in
 memory. It would be nice however if the RDD cache can be dissociated from
 the JVM heap so that in cases where garbage collection is difficult to
 tune, one could choose to discard the JVM and run the next operation in a
 few one.


 On Mon, May 19, 2014 at 10:06 PM, Matei Zaharia 
 matei.zaha...@gmail.comwrote:

 They’re tied to the SparkContext (application) that launched them.

 Matei

 On May 19, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 from looking at the source code i see executors run in their own jvm
 subprocesses.

 how long to they live for? as long as the worker/slave? or are they tied
 to the sparkcontext and life/die with it?

 thx






Re: Equivalent of collect() on DStream

2014-05-16 Thread Tathagata Das
Doesnt DStream.foreach() suffice?

anyDStream.foreach { rdd =
   // do something with rdd
}



On Wed, May 14, 2014 at 9:33 PM, Stephen Boesch java...@gmail.com wrote:

 Looking further it appears the functionality I am seeking is in the
 following *private[spark] * class ForEachdStream

 (version 0.8.1 , yes we are presently using an older release..)

 private[streaming]
 class ForEachDStream[T: ClassManifest] (
 parent: DStream[T],
 *foreachFunc: (RDD[T], Time) = Unit*
   ) extends DStream[Unit](parent.ssc) {

 I would like to have access to this structure - particularly the ability
 to define an foreachFunc that gets applied to each RDD within the
 DStream.  Is there a means to do so?



 2014-05-14 21:25 GMT-07:00 Stephen Boesch java...@gmail.com:


 Given that collect() does not exist on DStream apparently my mental model
 of Streaming RDD (DStream) needs correction/refinement.  So what is the
 means to convert DStream data into a JVM in-memory representation.  All of
 the methods on DStream i.e. filter, map, transform, reduce, etc generate
 other DStream's, and not an in memory data structure.







Re: missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Tathagata Das
This gives dependency tree in SBT (spark uses this).
https://github.com/jrudolph/sbt-dependency-graph

TD


On Mon, May 12, 2014 at 4:55 PM, Sean Owen so...@cloudera.com wrote:

 It sounds like you are doing everything right.

 NoSuchMethodError suggests it's finding log4j, just not the right
 version. That method is definitely in 1.2; it might have been removed
 in 2.x? (http://logging.apache.org/log4j/2.x/manual/migration.html)

 So I wonder if something is sneaking in log4j 2.x in your app? that's
 a first guess.

 I'd say consult mvn dependency:tree, but you're on sbt and I don't
 know the equivalent.

 On Mon, May 12, 2014 at 3:51 PM, Adrian Mocanu
 amoc...@verticalscope.com wrote:
  Hey guys,
 
  I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j
  dependency and was told that it was gone. However I still find it part of
  zookeeper imports. This is fine since I exclude it myself in the sbt
 file,
  but another issue arises.
 
  I wonder if anyone else has run into this.
 
 
 
  Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2
 
  I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5
 
 
 
  I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its
  log4j v 1.2.17 because I get missing method error:
 
  java.lang.NoSuchMethodError:
  org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V
 
  at
 
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
 
  at
 
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)
 
  at scala.Option.map(Option.scala:145)
 
  at
  org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)
 
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
 
  at
  org.apache.spark.SparkContext.init(SparkContext.scala:139)
 
  at
 
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)
 
  at
 
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)
 
  ...
 
 
 
  Is there a way to find out what versions of slf4j I need to make it work
  with log4j 1.2.17?
 
 
 
  -Adrian
 
 



Re: streaming on hdfs can detected all new file, but the sum of all the rdd.count() not equals which had detected

2014-05-12 Thread Tathagata Das
A very crucial thing to remember when using file stream is that the files
must be written to the monitored directory atomically. That is when the
file system show the file in its listing, the file should not be appended /
updated after that. That often causes this kind of issues, as spark
streaming may the file (soon after it is visible in the listing) and may
try to process it even before all of the data has been written.

So the best way to feed data into spark streaming is to write the file to a
temp dir, and them move / rename them into the monitored directory.
That makes it atomic. This is mentioned in the API docs of
fileStreamhttp://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext
.

TD



On Sun, May 11, 2014 at 7:30 PM, zqf12345 zqf12...@gmail.comwrote:

 when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200
 files , but the sum of rdd.count() is less than 200, always between 130 and
 170, I don't know why...Is this a Bug?
 PS: When I put 200 files in hdfs before streaming run , It get the correct
 count and right result.

 Here is the code:

 def main(args: Array[String]) {
 val conf = new SparkConf().setMaster(SparkURL)
 .setAppName(QimageStreaming-broadcast)
 .setSparkHome(System.getenv(SPARK_HOME))
 .setJars(SparkContext.jarOfClass(this.getClass()))
 conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 conf.set(spark.kryo.registrator, qing.hdu.Image.MyRegistrator)
 conf.set(spark.kryoserializer.buffer.mb, 10);
 val ssc = new StreamingContext(conf, Seconds(2))
 val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
 val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
 val input_path = HdfsURL + /Qimage/input
 val output_path = HdfsURL + /Qimage/output/
 val bg_path = HdfsURL + /Qimage/bg/
 val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
 QimageInputFormat[Text, Qimage]](bg_path)
 val bbg = bg.map(data = (data._1.toString(), data._2))
 val broadcastbg = ssc.sparkContext.broadcast(bbg)
 val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
 Qimage]](input_path)
 val qingbg = broadcastbg.value.collectAsMap
 val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) = {
 val rddnum = rdd.count
 System.out.println(\n\n+ rddnum is  + rddnum + \n\n)
 if (rddnum  0)
 {
 System.out.println(here is foreachFunc)
 val a = rdd.keys
 val b = a.first
  val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
 rdd.map(data = (data._1, (new QimageProc(data._1, data._2)).koutu(cbg)))
 .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage],
 outputFormatClass) }
 }
 file.foreachRDD(foreachFunc)
 ssc.start()
 ssc.awaitTermination()
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Average of each RDD in Stream

2014-05-12 Thread Tathagata Das
Use DStream.foreachRDD to do an operation on the final RDD of every batch.

val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1
+ b._1, a._2 + b._2) }
sumandcount.foreachRDD { rdd = val first: (Double, Int) = rdd.take(1)  ;
... }

DStream.reduce creates DStream whose RDDs have just one tuple each. The
rdd.take(1) above gets that one tuple.
However note that there is a corner case in this approach. If in a
particular batch, there is not data, then the rdd will have zero elements
(no data, nothing to reduce). So you have to take that into account (maybe
do a rdd.collect(), check the size, and then get the first / only element).

TD



On Wed, May 7, 2014 at 7:59 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 I use the following code for calculating average. The problem is that the
 reduce operation return a DStream here and not a tuple as it normally does
 without Streaming. So how can we get the sum and the count from the
 DStream. Can we cast it to tuple?

 val numbers = ssc.textFileStream(args(1))
 val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) =
 (a._1 + b._1, a._2 + b._2) }
 sumandcount.print()

 Regards,
 Laeeq




Re: Proper way to stop Spark stream processing

2014-05-12 Thread Tathagata Das
Since you are using the latest Spark code and not Spark 0.9.1 (guessed from
the log messages), you can actually do graceful shutdown of a streaming
context. This ensures that the receivers are properly stopped and all
received data is processed and then the system terminates (stop() stays
blocked until then. See other variations of streamingContext.stop().

TD

On Mon, May 12, 2014 at 2:49 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hello,

 I am trying to implement something like process a stream for N
 seconds, then return a result with Spark Streaming (built from git
 head). My approach (which is probably not very elegant) is

 val ssc = new StreamingContext(...)
 ssc.start()
 future {
   Thread.sleep(Seconds(N))
   ssc.stop(true)
 }
 ssc.awaitTermination()

 and in fact, this stops the stream processing. However, I get the
 following error messages:

 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
 receiver for stream 0: Stopped by driver
 14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
 receiver for stream 0: Restarting receiver with delay 2000ms: Retrying
 connecting to localhost:
 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found

 (where localhost: is the source I am reading the stream from).
 This doesn't actually seem like the proper way to do it. Can anyone
 point me to how to implement stop after N seconds without these
 error messages?

 Thanks
 Tobias



Re: How to use spark-submit

2014-05-07 Thread Tathagata Das
Doesnt the run-example script work for you? Also, are you on the latest
commit of branch-1.0 ?

TD


On Mon, May 5, 2014 at 7:51 PM, Soumya Simanta soumya.sima...@gmail.comwrote:



 Yes, I'm struggling with a similar problem where my class are not found on
 the worker nodes. I'm using 1.0.0_SNAPSHOT.  I would really appreciate if
 someone can provide some documentation on the usage of spark-submit.

 Thanks

  On May 5, 2014, at 10:24 PM, Stephen Boesch java...@gmail.com wrote:
 
 
  I have a spark streaming application that uses the external streaming
 modules (e.g. kafka, mqtt, ..) as well.  It is not clear how to properly
 invoke the spark-submit script: what are the ---driver-class-path and/or
 -Dspark.executor.extraClassPath parameters required?
 
   For reference, the following error is proving difficult to resolve:
 
  java.lang.ClassNotFoundException:
 org.apache.spark.streaming.examples.StreamingExamples
 



Re: sbt run with spark.ContextCleaner ERROR

2014-05-07 Thread Tathagata Das
Okay, this needs to be fixed. Thanks for reporting this!



On Mon, May 5, 2014 at 11:00 PM, wxhsdp wxh...@gmail.com wrote:

 Hi, TD

 i tried on v1.0.0-rc3 and still got the error



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304p5421.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: spark streaming question

2014-05-05 Thread Tathagata Das
One main reason why Spark Streaming can achieve higher throughput than
Storm is because Spark Streaming operates in coarser-grained batches -
second-scale massive batches - which reduce per-tuple of overheads in
shuffles, and other kinds of data movements, etc.

Note that, this is also true that this increased throughput does not come
for free: larger batches --- larger end-to-end latency. Storm may give a
lower end-to-end latency than Spark Streaming (second-scale latency with
second-scale batches). However, we have observed that for a large variety
of streaming usecases, people are often okay with second-scale latencies
but find it much harder work around the atleast-once
semantics  (double-counting, etc.) and lack of in-built state management
(state kept locally in worker can get lost if worker dies). Plus Spark
Streaming has the major advantage of having a simpler, higher-level API
than Storm and the whole Spark ecosystem (Spark SQL, MLlib, etc.) around it
that it can use for writing streaming analytics applications very easily.

Regarding Trident, we have heard from many developers that Trident gives
lower throughput than Storm due to its transactional guarantees. Its hard
to say the reasons behind the performance penalty without doing a very
detailed head-to-head analysis.

TD


On Sun, May 4, 2014 at 5:11 PM, Chris Fregly ch...@fregly.com wrote:

 great questions, weide.  in addition, i'd also like to hear more about how
 to horizontally scale a spark-streaming cluster.

 i've gone through the samples (standalone mode) and read the
 documentation, but it's still not clear to me how to scale this puppy out
 under high load.  i assume i add more receivers (kinesis, flume, etc), but
 physically how does this work?

 @TD:  can you comment?

 thanks!

 -chris


 On Sun, May 4, 2014 at 2:10 PM, Weide Zhang weo...@gmail.com wrote:

 Hi ,

 It might be a very general question to ask here but I'm curious to know
 why spark streaming can achieve better throughput than storm as claimed in
 the spark streaming paper. Does it depend on certain use cases and/or data
 source ? What drives better performance in spark streaming case or in other
 ways, what makes storm not as performant as spark streaming ?

 Also, in order to guarantee exact-once semantics when node failure
 happens,  spark makes replicas of RDDs and checkpoints so that data can be
 recomputed on the fly while on Trident case, they use transactional object
 to persist the state and result but it's not obvious to me which approach
 is more costly and why ? Any one can provide some experience here ?

 Thanks a lot,

 Weide





Re: Spark Streaming and JMS

2014-05-05 Thread Tathagata Das
A few high-level suggestions.

1. I recommend using the new Receiver API in almost-released Spark 1.0 (see
branch-1.0 / master branch on github). Its a slightly better version of the
earlier NetworkReceiver, as it hides away blockgenerator (which needed to
be unnecessarily manually started and stopped) and add other lifecycle
management methods like stop, restart, reportError to deal with errors in
receiving data. Also, adds ability to write custom receiver from Java. Take
a look at this 
examplehttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
of
writing custom receiver in the new API. I am updating the custom receiver
guide right now (https://github.com/apache/spark/pull/652).

2. Once you create a JMSReceiver class by extending
NetworkReceiver/Receiver, you can create DStream out of the receiver by

val jmsStream = ssc.networkStream(new JMSReceiver())

3. As far as i understand from seeing the docs of
akka,camel.Consumerhttp://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer,
it is essentially a specialized Akka actor. For Akka actors, there is a
ssc.actorStream, where you can specify your own actor class. You get actor
supervision (and therefore error handling, etc.) with that. See the example
AkkaWordCount - old style using
NetworkReceiverhttps://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala,
or new style using
Receiverhttps://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
.

I havent personally played around with JMS before so cant comment much on
JMS specific intricacies.

TD



On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin
mcgloin.patr...@gmail.comwrote:

 Hi all,

 Is there a best practice for subscribing to JMS with Spark Streaming?  I
 have searched but not found anything conclusive.

 In the absence of a standard practice the solution I was thinking of was
 to use Akka + Camel (akka.camel.Consumer) to create a subscription for a
 Spark Streaming Custom Receiver.  So the actor would look something like
 this:

 class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
 Consumer {
   //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1
   def endpointUri = jmsURI
   lazy val blockGenerator = new
 BlockGenerator(StorageLevel.MEMORY_ONLY_SER)

   protected override def onStart() {
 blockGenerator.start
   }

   def receive = {
 case msg: CamelMessage = { blockGenerator += msg.body }
 case _ = { /* ... */ }
   }

   protected override def onStop() {
 blockGenerator.stop
   }
 }

 And then in the main application create receivers like this:

 val ssc = new StreamingContext(...)
 object tascQueue extends JmsReceiver[String](ssc) {
 override def getReceiver():JmsReceiver[String] = {
  new JmsReceiver(jms
 :sonicmq://localhost:2506/queue?destination=TascQueue)
  }
 }
 ssc.registerInputStream(tascQueue)

 Is this the best way to go?

 Best regards,
 Patrick



Re: spark run issue

2014-05-04 Thread Tathagata Das
All the stuff in lib_managed are what gets downloaded by sbt/maven when you
compile. Those are necessary for running spark, spark streaming, etc. But
you should not have to add all that to classpath individually and manually
when running Spark programs. If you are trying to run your Spark program
locally, you should use sbt or maven to compile your project with Spark as
a dependency, and sbt/maven will take care of putting all the necessary
jars in the classpath (when you run run your program with sbt/maven). If
you are trying to run your Spark program on a cluster, then refer to
the deployment
guide  http://spark.apache.org/docs/latest/cluster-overview.html. To run
a Spark stand alone cluster, you just have to compile spark and place the
whole spark directory on the worker nodes. For other deploy modes like Yarn
and Mesos, you should just compile spark into a big all-inclusive jar and
supply that when you launch your program on Yarn/mesos. See the guide for
more details.

TD


On Sat, May 3, 2014 at 7:24 PM, Weide Zhang weo...@gmail.com wrote:

 Hi Tathagata,

 I actually have a separate question. What's the usage of lib_managed
 folder inside spark source folder ? Are those the library required for
 spark streaming to run ? Do they needed to be added to spark classpath when
 starting sparking cluster?

 Weide


 On Sat, May 3, 2014 at 7:08 PM, Weide Zhang weo...@gmail.com wrote:

 Hi Tathagata,

 I figured out the reason. I was adding a wrong kafka lib along side with
 the version spark uses. Sorry for spamming.

 Weide


 On Sat, May 3, 2014 at 7:04 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 I am a little confused about the version of Spark you are using. Are you
 using Spark 0.9.1 that uses scala 2.10.3 ?

 TD


 On Sat, May 3, 2014 at 6:16 PM, Weide Zhang weo...@gmail.com wrote:

 Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
 encountered some exception when initialize kafka consumer/producer config.
 I'm using scala 2.10.3 and used maven build inside spark streaming kafka
 library comes with spark2.9.1. Any one see this exception before?

 Thanks,

 producer:
  [java] The args attribute is deprecated. Please use nested arg
 elements.
  [java] Exception in thread main java.lang.NoClassDefFoundError:
 scala/Tuple2$mcLL$sp
  [java] at
 kafka.producer.ProducerConfig.init(ProducerConfig.scala:56)
  [java] at
 com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
  [java] at
 com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
  [java] Caused by: java.lang.ClassNotFoundException:
 scala.Tuple2$mcLL$sp
  [java] at
 java.net.URLClassLoader$1.run(URLClassLoader.java:202)
  [java] at java.security.AccessController.doPrivileged(Native
 Method)
  [java] at
 java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  [java] at
 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
  [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  [java] ... 3 more
  [java] Java Result: 1







Re: sbt run with spark.ContextCleaner ERROR

2014-05-04 Thread Tathagata Das
Can you tell which version of Spark you are using? Spark 1.0 RC3, or
something intermediate?
And do you call sparkContext.stop at the end of your application? If so,
does this error occur before or after the stop()?

TD


On Sun, May 4, 2014 at 2:40 AM, wxhsdp wxh...@gmail.com wrote:

 Hi, all

 i use sbt to run my spark application, after the app completes, error
 occurs:

 14/05/04 17:32:28 INFO network.ConnectionManager: Selector thread was
 interrupted!
 14/05/04 17:32:28 ERROR spark.ContextCleaner: Error in cleaning thread
 java.lang.InterruptedException
 at java.lang.Object.wait(Native Method)
 at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
 at
 org.apache.spark.ContextCleaner.org
 $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:116)
 at
 org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:64)

 has anyone met this before?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: another updateStateByKey question

2014-05-02 Thread Tathagata Das
Could be a bug. Can you share a code with data that I can use to reproduce
this?

TD
On May 2, 2014 9:49 AM, Adrian Mocanu amoc...@verticalscope.com wrote:

  Has anyone else noticed that *sometimes* the same tuple calls update
 state function twice?

 I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1),
 (a,2) ]

 When the update function is called the first time Seq[V] has data: 1, 2
 which is correct: StateClass(3,2, ArrayBuffer(1, 2))

 Then right away (in my output I see this) the same key is used and the
 function is called again but this time Seq is empty: StateClass(3,2,
 ArrayBuffer( ))



 In the update function I also save Seq[V] to state so I can see it in the
 RDD. I also show a count and sum of the values.

 StateClass(sum, count, Seq[V])



 Why is the update function called with empty Seq[V] on the same key when
 all values for that key have been already taken care of in a previous
 update?



 -Adrian





Re: Spark streaming

2014-05-01 Thread Tathagata Das
Take a look at the RDD.pipe() operation. That allows you to pipe the data
in a RDD to any external shell command (just like Unix Shell pipe).
On May 1, 2014 10:46 AM, Mohit Singh mohit1...@gmail.com wrote:

 Hi,
   I guess Spark is using streaming in context of streaming live data but
 what I mean is something more on the lines of  hadoop streaming.. where one
 can code in any programming language?
 Or is something among that lines on the cards?

 Thanks
 --
 Mohit

 When you want success as badly as you want the air, then you will get it.
 There is no other secret of success.
 -Socrates



Re: What is Seq[V] in updateStateByKey?

2014-05-01 Thread Tathagata Das
Depends on your code. Referring to the earlier example, if you do

words.map(x = (x,1)).updateStateByKey()

then for a particular word, if a batch contains 6 occurrences of that word,
then the Seq[V] will be [1, 1, 1, 1, 1, 1]

Instead if you do

words.map(x = (x,1)).reduceByKey(_ + _).updateStateByKey(...)

then Seq[V] will be [ 6 ]  , that is, all the 1s will be summed up already
due to the reduceByKey.

TD



On Thu, May 1, 2014 at 7:29 AM, Adrian Mocanu amoc...@verticalscope.comwrote:

 So Seq[V] contains only new tuples. I initially thought that whenever a
 new tuple was found, it would add it to Seq and call the update function
 immediately so there wouldn't be more than 1 update to Seq per function
 call.

 Say I want to sum tuples with the same key is an RDD using
 updateStateByKey, Then (1) Seq[V] would contain the numbers for a
 particular key and my S state could be the sum?
 Or would (2) Seq contain partial sums (say sum per partition?) which I
 then need to sum into the final sum?

 After writing this out and thinking a little more about it I think #2 is
 correct. Can you confirm?

 Thanks again!
 -A

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: April-30-14 4:30 PM
 To: user@spark.apache.org
 Subject: Re: What is Seq[V] in updateStateByKey?

 S is the previous count, if any. Seq[V] are potentially many new counts.
 All of them have to be added together to keep an accurate total.  It's as
 if the count were 3, and I tell you I've just observed 2, 5, and 1
 additional occurrences -- the new count is 3 + (2+5+1) not
 1 + 1.


 I butted in since I'd like to ask a different question about the same line
 of code. Why:

   val currentCount = values.foldLeft(0)(_ + _)

 instead of

   val currentCount = values.sum

 This happens a few places in the code. sum seems equivalent and likely
 quicker. Same with things like filter(_ == 200).size instead of count(_
 == 200)... pretty trivial but hey.


 On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com
 wrote:
  Hi TD,
 
  Why does the example keep recalculating the count via fold?
 
  Wouldn’t it make more sense to get the last count in values Seq and
  add 1 to it and save that as current count?
 
 
 
  From what Sean explained I understand that all values in Seq have the
  same key. Then when a new value for that key is found it is added to
  this Seq collection and the update function is called.
 
 
 
  Is my understanding correct?



Re: range partitioner with updateStateByKey

2014-05-01 Thread Tathagata Das
Ordered by what? arrival order? sort order?

TD



On Thu, May 1, 2014 at 2:35 PM, Adrian Mocanu amoc...@verticalscope.comwrote:

  If I use a range partitioner, will this make updateStateByKey take the
 tuples in order?

 Right now I see them not being taken in order (most of them are ordered
 but not all)



 -Adrian





Re: What is Seq[V] in updateStateByKey?

2014-04-30 Thread Tathagata Das
Yeah, I remember changing fold to sum in a few places, probably in
testsuites, but missed this example I guess.



On Wed, Apr 30, 2014 at 1:29 PM, Sean Owen so...@cloudera.com wrote:

 S is the previous count, if any. Seq[V] are potentially many new
 counts. All of them have to be added together to keep an accurate
 total.  It's as if the count were 3, and I tell you I've just observed
 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
 1 + 1.


 I butted in since I'd like to ask a different question about the same
 line of code. Why:

   val currentCount = values.foldLeft(0)(_ + _)

 instead of

   val currentCount = values.sum

 This happens a few places in the code. sum seems equivalent and likely
 quicker. Same with things like filter(_ == 200).size instead of
 count(_ == 200)... pretty trivial but hey.


 On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
 amoc...@verticalscope.com wrote:
  Hi TD,
 
  Why does the example keep recalculating the count via fold?
 
  Wouldn’t it make more sense to get the last count in values Seq and add
 1 to
  it and save that as current count?
 
 
 
  From what Sean explained I understand that all values in Seq have the
 same
  key. Then when a new value for that key is found it is added to this Seq
  collection and the update function is called.
 
 
 
  Is my understanding correct?



Re: What is Seq[V] in updateStateByKey?

2014-04-29 Thread Tathagata Das
You may have already seen it, but I will mention it anyways. This example
may help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the
value type (i.e, V) is Int (count of a word in each batch) and the state
type (i.e. S) is also a Int (running count). The updateFunction essentially
sums up the running count with the new count and to generate a new running
count.

TD



On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen so...@cloudera.com wrote:

 The original DStream is of (K,V). This function creates a DStream of
 (K,S). Each time slice brings one or more new V for each K. The old
 state S (can be different from V!) for each K -- possibly non-existent
 -- is updated in some way by a bunch of new V, to produce a new state
 S -- which also might not exist anymore after update. That's why the
 function is from a Seq[V], and an Option[S], to an Option[S].

 If you RDD has value type V = Double then your function needs to
 update state based on a new Seq[Double] at each time slice, since
 Doubles are the new thing arriving for each key at each time slice.


 On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
 amoc...@verticalscope.com wrote:
  What is Seq[V] in updateStateByKey?
 
  Does this store the collected tuples of the RDD in a collection?
 
 
 
  Method signature:
 
  def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
  Option[S] ): DStream[(K, S)]
 
 
 
  In my case I used Seq[Double] assuming a sequence of Doubles in the RDD;
 the
  moment I switched to a different type like Seq[(String, Double)] the code
  didn’t compile.
 
 
 
  -Adrian
 
 



Re: Spark's behavior

2014-04-29 Thread Tathagata Das
Strange! Can you just do lines.print() to print the raw data instead of
doing word count. Beyond that we can do two things.

1. Can see the Spark stage UI to see whether there are stages running
during the 30 second period you referred to?
2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see
different thread by Patrick), it has a streaming UI, which shows the number
of records received, the state of the receiver, etc. That may be more
useful in debugging whats going on .

TD


On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia 
e.costaalf...@unibs.it wrote:

 Hi TD,
 We are not using stream context with master local, we have 1 Master and 8
 Workers and 1 word source. The command line that we are using is:
 bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount
 spark://192.168.0.13:7077

 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Is you batch size 30 seconds by any chance?

 Assuming not, please check whether you are creating the streaming context
 with master local[n] where n  2. With local or local[1], the system
 only has one processing slot, which is occupied by the receiver leaving no
 room for processing the received data. It could be that after 30 seconds,
 the server disconnects, the receiver terminates, releasing the single slot
 for the processing to proceed.

 TD


 On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it wrote:

 Hi TD,

 In my tests with spark streaming, I'm using
 JavaNetworkWordCount(modified) code and a program that I wrote that sends
 words to the Spark worker, I use TCP as transport. I verified that after
 starting Spark, it connects to my source which actually starts sending, but
 the first word count is advertised approximately 30 seconds after the
 context creation. So I'm wondering where is stored the 30 seconds data
 already sent by the source. Is this a normal spark’s behaviour? I saw the
 same behaviour using the shipped JavaNetworkWordCount application.

 Many thanks.
 --
 Informativa sulla Privacy: http://www.unibs.it/node/8155




 Informativa sulla Privacy: http://www.unibs.it/node/8155



Re: Java Spark Streaming - SparkFlumeEvent

2014-04-28 Thread Tathagata Das
You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using
SparkFlumeEvent.event. That should probably give you all the original text
data.



On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram vikram.kulka...@hp.comwrote:

  Hi Spark-users,

   Within my Spark Streaming program, I am able to ingest data sent by my
 Flume Avro Client. I configured a ‘spooling directory source’ to write data
 to a Flume Avro Sink (the Spark Streaming Driver program in this case). The
 default deserializer i.e. LINE is used to parse the file into events.
 Therefore I am expecting an event (SparkFlumeEvent) for every line in the
 log file.



 My Spark Streaming Code snippet here:



System.*out*.println(Setting up Flume Stream using Avro Sink at: + 
 avroServer +
 : + avroPort);



//JavaDStreamSparkFlumeEvent flumeStream =
 sc.flumeStream(XXX.YYY.XXX.YYY, port);

JavaDStreamSparkFlumeEvent flumeStream =

  FlumeUtils.*createStream*(ssc, avroServer, avroPort);



flumeStream.count();



flumeStream.foreach(*new* *FunctionJavaRDDSparkFlumeEvent,Void
 ()* {

  @Override

  *public* Void call(JavaRDDSparkFlumeEvent
 eventsData) *throws* Exception {

 ListSparkFlumeEvent events =
 eventsData.collect();

 IteratorSparkFlumeEvent batchedEvents =
 events.iterator();



 System.*out*.println( Received Spark
 Flume Events:  + events.size());

 *while*(batchedEvents.hasNext()) {

SparkFlumeEvent flumeEvent =
 batchedEvents.next();

//System.out.println(SparkFlumeEvent
 =  + flumeEvent);

//System.out.println( +
 flumeEvent.toString());



 //TODO: How to build each line in the file using this SparkFlumeEvent
 object?

 }

*return* *null*;

  }

  });



 Within this while loop, how do I extract each line that was streamed using
 the SparkFlumeEvent object? I intend to then parse this line, extract
 various fields and then persist it to memory.



 Regards,

 Vikram





Re: Bind exception while running FlumeEventCount

2014-04-22 Thread Tathagata Das
Hello Neha,

This is the result of a known bug in 0.9. Can you try running the latest
Spark master branch to see if this problem is resolved?

TD


On Tue, Apr 22, 2014 at 2:48 AM, NehaS Singh nehas.si...@lntinfotech.comwrote:

  Hi,

 I have installed
 spark-0.9.0-incubating-bin-cdh4 and I am using apache flume for streaming.
 I have used the streaming.examples.FlumeEventCount. Also I have written
 Avro conf file for flume.When I try to do streamin ing spark and I run
 the following command it throws error



 ./bin/run-example org.apache.spark.streaming.examples.FlumeEventCount
 local[2] ip 10001



 org.jboss.netty.channel.ChannelException: Failed to bind to: /ip:9988

 at
 org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

 at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106)

 at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119)

 at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74)

 at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68)

 at
 org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:143)

 at
 org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126)

 at
 org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173)

 at
 org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)

 at org.apache.spark.scheduler.Task.run(Task.scala:53)

 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)

 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

 at java.lang.Thread.run(Thread.java:662)

 Caused by: java.net.BindException: Cannot assign requested address

 at sun.nio.ch.Net.bind(Native Method)

 at
 sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)

 at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)

 at
 org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)

 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)

 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)

 at
 org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)

 ... 3 more



 Can you please help



 This is my flumetest.conf



 a1.sources = tail-file

 a1.channels = c1

 a1.sinks=avro-sink



 # define the flow

 a1.sources.tail-file.channels = c1

 a1.sinks.avro-sink.channels = c1

 a1.channels.c1.type = memory

 a1.channels.c1.capacity = 1000



 # define source and sink

 a1.sources.tail-file.type = exec

 a1.sources.tail-file.command = tail -F /home/hduser/Flume/test.log

 #a1.sources.tail-file.channels = c1

 a1.sinks.avro-sink.type = avro

 a1.sinks.avro-sink.hostname = ip   // agent a2.s ip address or host name

 a1.sinks.avro-sink.port = 10001

 # Use a channel which buffers events in memory

 a1.channels.c1.type = memory

 a1.channels.c1.capacity = 1

 a1.channels.c1.transactionCapacity = 1



 # Bind the source and sink to the channel

 a1.sources.tail-file.channels = c1

 a1.sinks.avro-sink.channel = c1



 Flumeslavetest.conf:-



 a2.sources = avro-collection-source

 a2.sinks = hdfs-sink

 a2.channels = mem-channel



 # define the flow

 a2.sources.avro-collection-source.channels = mem-channel

 a2.sinks.hdfs-sink.channel = mem-channel

 a2.channels.mem-channel.type = memory

 a2.channels.mem-channel.capacity = 1000



 # avro source properties

 a2.sources.avro-collection-source.type = avro

 a2.sources.avro-collection-source.bind = ip   // agent a2.s ip address or
 host name

 a2.sources.avro-collection-source.port = 10001



 # hdfs sink properties

 a2.sinks.hdfs-sink.type = hdfs

 a2.sinks.hdfs-sink.hdfs.writeFormat = Text

 a2.sinks.hdfs-sink.hdfs.filePrefix =  testing

 a2.sinks.hdfs-sink.hdfs.path = hdfs://ip:8020/testingData





 My flume is running properly its is able to write file on hdfs.



 Please help as to how to resolve the error



 Regards,

 Neha Singh



 --
 The contents of this e-mail and any attachment(s) may contain 

Re: Strange behaviour of different SSCs with same Kafka topic

2014-04-21 Thread Tathagata Das
Are you by any chance starting two StreamingContexts in the same JVM? That
could explain a lot of the weird mixing of data that you are seeing. Its
not a supported usage scenario to start multiple streamingContexts
simultaneously in the same JVM.

TD


On Thu, Apr 17, 2014 at 10:58 PM, gaganbm gagan.mis...@gmail.com wrote:

 It happens with normal data rate, i.e., lets say 20 records per second.

 Apart from that, I am also getting some more strange behavior. Let me
 explain.

 I establish two sscs. Start them one after another. In SSCs I get the
 streams from Kafka sources, and do some manipulations. Like, adding some
 Record_Name for example, to each of the incoming records. Now this
 Record_Name is different for both the SSCs, and I get this field from some
 other class, not relevant to the streams.

 Now, expected behavior should be, all records in SSC1 gets added with the
 field RECORD_NAME_1 and all records in SSC2 should get added with the field
 RECORD_NAME_2. Both the SSCs have nothing to do with each other as I
 believe.

 However, strangely enough, I find many records in SSC1 get added with
 RECORD_NAME_2 and vice versa. Is it some kind of serialization issue ?
 That, the class which provides this RECORD_NAME gets serialized and is
 reconstructed and then some weird thing happens inside ? I am unable to
 figure out.

 So, apart from skewed frequency and volume of records in both the streams,
 I am getting this inter-mingling of data among the streams.

 Can you help me in how to use some external data to manipulate the RDD
 records ?

 Thanks and regards

 Gagan B Mishra


 *Programmer*
 *560034, Bangalore*
 *India*


 On Tue, Apr 15, 2014 at 4:09 AM, Tathagata Das [via Apache Spark User
 List] [hidden email] 
 http://user/SendEmail.jtp?type=nodenode=4434i=0wrote:

 Does this happen at low event rate for that topic as well, or only for a
 high volume rate?

 TD


 On Wed, Apr 9, 2014 at 11:24 PM, gaganbm [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=4238i=0
  wrote:

 I am really at my wits' end here.

 I have different Streaming contexts, lets say 2, and both listening to
 same
 Kafka topics. I establish the KafkaStream by setting different consumer
 groups to each of them.

 Ideally, I should be seeing the kafka events in both the streams. But
 what I
 am getting is really unpredictable. Only one stream gets a lot of events
 and
 the other one almost gets nothing or very less compared to the other.
 Also
 the frequency is very skewed. I get a lot of events in one stream
 continuously, and after some duration I get a few events in the other
 one.

 I don't know where I am going wrong. I can see consumer fetcher threads
 for
 both the streams that listen to the Kafka topics.

 I can give further details if needed. Any help will be great.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4238.html
  To start a new topic under Apache Spark User List, email [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=4434i=1
 To unsubscribe from Apache Spark User List, click here.
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Strange behaviour of different SSCs
 with same Kafka 
 topichttp://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050p4434.html

 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.



Re: question about the SocketReceiver

2014-04-21 Thread Tathagata Das
As long as the socket server sends data through the same connection, the
existing code is going to work. The socket.getInputStream returns a input
stream which will continuously allow you to pull data sent over the
connection. The bytesToObject function continuously reads data from the
input stream and coverts them to objects. This will continue until the
input stream is closed (i.e., the connection is closed).

TD


On Sun, Apr 20, 2014 at 1:36 AM, YouPeng Yang yypvsxf19870...@gmail.comwrote:

 Hi
   I am studing the structure of the Spark Streaming(my spark version is
 0.9.0). I have a question about the SocketReceiver.In the onStart function:
   ---
   protected def onStart() {
 logInfo(Connecting to  + host + : + port)
 val socket = new Socket(host, port)
 logInfo(Connected to  + host + : + port)
 blockGenerator.start()
 val iterator = bytesToObjects(socket.getInputStream())
 while(iterator.hasNext) {
   val obj = iterator.next
   blockGenerator += obj
 }
   }
   -
   Here the Socket client is created and read data iteratively. My question
 is the onStart function is only called once by the super class
 NetworkReceiver, and correspondingly read data one time.
   When the socket server send data again, how does the SocketReceiver read
 the input data,I can find any src hint about the process.
   In my opinion, the Socket instance should read the data cyclically as
 following:
 -
 InputStream is = socket.getInputStream()

 while(theEndflag){
if(is.avariable = 0){
  val iterator = bytesToObjects(is)
  while(iterator.hasNext) {
   val obj = iterator.next
   blockGenerator += obj
  }
}
 }
 //theEndflag is the end flag of the loop,shoud be set to false when needed.
  ---

 I know it may not be the right thought,however i am really curious about
 the Socket read process because it hard to understand.

 Any suggestions will be appreciated.





Re: checkpointing without streaming?

2014-04-21 Thread Tathagata Das
Diana, that is a good question.

When you persist an RDD, the system still remembers the whole lineage of
parent RDDs that created that RDD. If one of the executor fails, and the
persist data is lost (both local disk and memory data will get lost), then
the lineage is used to recreate the RDD. The longer the lineage, the more
recomputation the system has to do in case of failure, and hence higher
recovery time. So its not a good idea to have a very long lineage, as it
leads to all sorts of problems, like the one Xiangrui pointed to.

Checkpointing an RDD actually saves the RDD data to HDFS and removes
pointers to the parent RDDs (as the data can be regenerated just by reading
from the HDFS file). So that RDDs data does not need to be recomputed
when worker fails, just re-read. In fact, the data is also retained across
driver restarts as it is in HDFS.

RDD.checkpoint was introduced with streaming because streaming is obvious
use case where the lineage will grow infinitely long (for stateful
computations where each result depends on all the previously received
data). However, this checkpointing is useful for any long running RDD
computation, and I know that people have used RDD.checkpoint() independent
of streaming.

TD


On Mon, Apr 21, 2014 at 1:10 PM, Xiangrui Meng men...@gmail.com wrote:

 Persist doesn't cut lineage. You might run into StackOverflow problem
 with a long lineage. See
 https://spark-project.atlassian.net/browse/SPARK-1006 for example.

 On Mon, Apr 21, 2014 at 12:11 PM, Diana Carroll dcarr...@cloudera.com
 wrote:
  When might that be necessary or useful?  Presumably I can persist and
  replicate my RDD to avoid re-computation, if that's my goal.  What
 advantage
  does checkpointing provide over disk persistence with replication?
 
 
  On Mon, Apr 21, 2014 at 2:42 PM, Xiangrui Meng men...@gmail.com wrote:
 
  Checkpoint clears dependencies. You might need checkpoint to cut a
  long lineage in iterative algorithms. -Xiangrui
 
  On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com
  wrote:
   I'm trying to understand when I would want to checkpoint an RDD rather
   than
   just persist to disk.
  
   Every reference I can find to checkpoint related to Spark Streaming.
   But
   the method is defined in the core Spark library, not Streaming.
  
   Does it exist solely for streaming, or are there circumstances
 unrelated
   to
   streaming in which I might want to checkpoint...and if so, like what?
  
   Thanks,
   Diana
 
 



Re: Strange behaviour of different SSCs with same Kafka topic

2014-04-14 Thread Tathagata Das
Does this happen at low event rate for that topic as well, or only for a
high volume rate?

TD


On Wed, Apr 9, 2014 at 11:24 PM, gaganbm gagan.mis...@gmail.com wrote:

 I am really at my wits' end here.

 I have different Streaming contexts, lets say 2, and both listening to same
 Kafka topics. I establish the KafkaStream by setting different consumer
 groups to each of them.

 Ideally, I should be seeing the kafka events in both the streams. But what
 I
 am getting is really unpredictable. Only one stream gets a lot of events
 and
 the other one almost gets nothing or very less compared to the other. Also
 the frequency is very skewed. I get a lot of events in one stream
 continuously, and after some duration I get a few events in the other one.

 I don't know where I am going wrong. I can see consumer fetcher threads for
 both the streams that listen to the Kafka topics.

 I can give further details if needed. Any help will be great.

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behaviour-of-different-SSCs-with-same-Kafka-topic-tp4050.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Spark 0.9.1 released

2014-04-09 Thread Tathagata Das
Hi everyone,

We have just posted Spark 0.9.1, which is a maintenance release with
bug fixes, performance improvements, better stability with YARN and
improved parity of the Scala and Python API. We recommend all 0.9.0
users to upgrade to this stable release.

This is the first release since Spark graduated as a top level Apache
project. Contributions to this release came from 37 developers.

The full release notes are at:
http://spark.apache.org/releases/spark-release-0-9-1.html

You can download the release at:
http://spark.apache.org/downloads.html

Thanks all the developers who contributed to this release:
Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng

TD


Re: Spark 0.9.1 released

2014-04-09 Thread Tathagata Das
A small additional note: Please use the direct download links in the Spark
Downloads http://spark.apache.org/downloads.html page. The Apache mirrors
take a day or so to sync from the main repo, so may not work immediately.

TD


On Wed, Apr 9, 2014 at 2:54 PM, Tathagata Das
tathagata.das1...@gmail.comwrote:

 Hi everyone,

 We have just posted Spark 0.9.1, which is a maintenance release with
 bug fixes, performance improvements, better stability with YARN and
 improved parity of the Scala and Python API. We recommend all 0.9.0
 users to upgrade to this stable release.

 This is the first release since Spark graduated as a top level Apache
 project. Contributions to this release came from 37 developers.

 The full release notes are at:
 http://spark.apache.org/releases/spark-release-0-9-1.html

 You can download the release at:
 http://spark.apache.org/downloads.html

 Thanks all the developers who contributed to this release:
 Aaron Davidson, Aaron Kimball, Andrew Ash, Andrew Or, Andrew Tulloch,
 Bijay Bisht, Bouke van der Bijl, Bryn Keller, Chen Chao,
 Christian Lundgren, Diana Carroll, Emtiaz Ahmed, Frank Dai,
 Henry Saputra, jianghan, Josh Rosen, Jyotiska NK, Kay Ousterhout,
 Kousuke Saruta, Mark Grover, Matei Zaharia, Nan Zhu, Nick Lanham,
 Patrick Wendell, Prabin Banka, Prashant Sharma, Qiuzhuang,
 Raymond Liu, Reynold Xin, Sandy Ryza, Sean Owen, Shixiong Zhu,
 shiyun.wxm, Stevo Slavić, Tathagata Das, Tom Graves, Xiangrui Meng

 TD



Re: CheckpointRDD has different number of partitions than original RDD

2014-04-08 Thread Tathagata Das
 Classpath
 /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
System Classpath

 http://10.10.41.67:40368/jars/spark-examples_2.10-assembly-0.9.0-incubating.jarAdded
  By User











 From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
 Sent: Monday, April 07, 2014 7:54 PM
 To: user@spark.apache.org
 Subject: Re: CheckpointRDD has different number of partitions than
 original RDD

 Few things that would be helpful.

 1. Environment settings - you can find them on the environment tab in the
 Spark application UI
 2. Are you setting the HDFS configuration correctly in your Spark program?
 For example, can you write a HDFS file from a Spark program (say
 spark-shell) to your HDFS installation and read it back into Spark (i.e.,
 create a RDD)? You can test this by write an RDD as a text file from the
 shell, and then try to read it back from another shell.
 3. If that works, then lets try explicitly checkpointing an RDD. To do
 this you can take any RDD and do the following.

 myRDD.checkpoint()
 myRDD.count()

 If there is some issue, then this should reproduce the above error.

 TD

 On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren pmog...@commercehub.com
 wrote:
 Hello, Spark community!  My name is Paul. I am a Spark newbie, evaluating
 version 0.9.0 without any Hadoop at all, and need some help. I run into the
 following error with the StatefulNetworkWordCount example (and similarly in
 my prototype app, when I use the updateStateByKey operation).  I get this
 when running against my small cluster, but not (so far) against local[2].

 61904 [spark-akka.actor.default-dispatcher-2] ERROR
 org.apache.spark.streaming.scheduler.JobScheduler - Error running job
 streaming job 1396905956000 ms.0
 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take
 at DStream.scala:586(0) has different number of partitions than original
 RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2)
 at
 org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99)
 at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:855)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:870)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:884)
 at org.apache.spark.rdd.RDD.take(RDD.scala:844)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Please let me know what other information would be helpful; I didn't find
 any question submission guidelines.

 Thanks,
 Paul




Re: CheckpointRDD has different number of partitions than original RDD

2014-04-07 Thread Tathagata Das
Few things that would be helpful.

1. Environment settings - you can find them on the environment tab in the
Spark application UI
2. Are you setting the HDFS configuration correctly in your Spark program?
For example, can you write a HDFS file from a Spark program (say
spark-shell) to your HDFS installation and read it back into Spark (i.e.,
create a RDD)? You can test this by write an RDD as a text file from the
shell, and then try to read it back from another shell.
3. If that works, then lets try explicitly checkpointing an RDD. To do this
you can take any RDD and do the following.

myRDD.checkpoint()
myRDD.count()

If there is some issue, then this should reproduce the above error.

TD


On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren pmog...@commercehub.com wrote:

 Hello, Spark community!  My name is Paul. I am a Spark newbie, evaluating
 version 0.9.0 without any Hadoop at all, and need some help. I run into the
 following error with the StatefulNetworkWordCount example (and similarly in
 my prototype app, when I use the updateStateByKey operation).  I get this
 when running against my small cluster, but not (so far) against local[2].

 61904 [spark-akka.actor.default-dispatcher-2] ERROR
 org.apache.spark.streaming.scheduler.JobScheduler - Error running job
 streaming job 1396905956000 ms.0
 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take
 at DStream.scala:586(0) has different number of partitions than original
 RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2)
 at
 org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99)
 at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:855)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:870)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:884)
 at org.apache.spark.rdd.RDD.take(RDD.scala:844)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Please let me know what other information would be helpful; I didn't find
 any question submission guidelines.

 Thanks,
 Paul



Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-28 Thread Tathagata Das
The cleaner ttl was introduced as a brute force method to clean all old
data and metadata in the system, so that the system can run 24/7. The
cleaner ttl should be set to a large value, so that RDDs older than that
are not used. Though there are some cases where you may want to use an RDD
again and again for an infinite duration, in which case no value of TTL is
good enough. There are way around it - if you are going to use an RDD
beyond the tll value, then you will have to recreate the RDD after some
interval.

However, the ideal solution is to actually is to identify the stuff (RDD,
broadcasts, etc.) that are ready to be GCed and then clear their associated
data. This is currently being implemented as a part of this PR.
https://github.com/apache/spark/pull/126
This will make setting the cleaner TTL unnecessary and reduce errors
related to cleaning up of needed RDDS.






On Thu, Mar 27, 2014 at 3:47 PM, Evgeny Shishkin itparan...@gmail.comwrote:


 On 28 Mar 2014, at 01:44, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 The more I think about it the problem is not about /tmp, its more about
 the workers not having enough memory. Blocks of received data could be
 falling out of memory before it is getting processed.
 BTW, what is the storage level that you are using for your input stream?
 If you are using MEMORY_ONLY, then try MEMORY_AND_DISK. That is safer
 because it ensure that if received data falls out of memory it will be at
 least saved to disk.

 TD


 And i saw such errors because of cleaner.rtt.
 Thich erases everything. Even needed rdds.




 On Thu, Mar 27, 2014 at 2:29 PM, Scott Clasen scott.cla...@gmail.comwrote:

 Heh sorry that wasnt a clear question, I know 'how' to set it but dont
 know
 what value to use in a mesos cluster, since the processes are running in
 lxc
 containers they wont be sharing a filesystem (or machine for that matter)

 I cant use an s3n:// url for local dir can I?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3373.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: spark streaming and the spark shell

2014-03-27 Thread Tathagata Das
Seems like the configuration of the Spark worker is not right. Either the
worker has not been given enough memory or the allocation of the memory to
the RDD storage needs to be fixed. If configured correctly, the Spark
workers should not get OOMs.



On Thu, Mar 27, 2014 at 2:52 PM, Evgeny Shishkin itparan...@gmail.comwrote:


 2.   I notice that once I start ssc.start(), my stream starts processing
 and
 continues indefinitely...even if I close the socket on the server end (I'm
 using unix command nc to mimic a server as explained in the streaming
 programming guide .)  Can I tell my stream to detect if it's lost a
 connection and therefore stop executing?  (Or even better, to attempt to
 re-establish the connection?)



 Currently, not yet. But I am aware of this and this behavior will be
 improved in the future.


 Now i understand why out spark streaming job starts to generate zero sized
 rdds from kafkainput,
 when one worker get OOM or crashes.

 And we can't detect it! Great. So spark streaming just doesn't suite yet
 for 24/7 operation =\



Re: Spark Streaming - Shared hashmaps

2014-03-26 Thread Tathagata Das
When you say launch long-running tasks does it mean long running Spark
jobs/tasks, or long-running tasks in another system?

If the rate of requests from Kafka is not low (in terms of records per
second), you could collect the records in the driver, and maintain the
shared bag in the driver. A separate thread in the driver could pick
stuff from the bag and launch tasks. This is a slightly unorthodox use of
Spark Streaming, but should work.

If the rate of request from Kafka is high, then I am not sure how you can
sustain that many long running tasks (assuming 1 task corresponding to each
request from Kafka).

TD


On Wed, Mar 26, 2014 at 1:19 AM, Bryan Bryan bryanbryan...@gmail.comwrote:

 Hi there,

 I have read about the two fundamental shared features in spark
 (broadcasting variables and accumulators), but this is what i need.

 I'm using spark streaming in order to get requests from Kafka, these
 requests may launch long-running tasks, and i need to control them:

 1) Keep them in a shared bag, like a Hashmap, to retrieve them by ID, for
 example.
 2) Retrieve an instance of this object/task whatever on-demand
 (on-request, in fact)


 Any idea about that? How can i share objects between slaves? May i use
 something out of spark (maybe hazelcast')


 Regards.



Re: streaming questions

2014-03-26 Thread Tathagata Das
*Answer 1:*Make sure you are using master as local[n] with n  1
(assuming you are running it in local mode). The way Spark Streaming
works is that it assigns a code to the data receiver, and so if you
run the program with only one core (i.e., with local or local[1]),
then it wont have resources to process data along with receiving it.


*Answer 2:*Spark Streaming is designed to replicate the received data
within the
machines in a Spark cluster for fault-tolerance. However, when you are
running in the local mode, since there is only one machine, the
blocks of data arent able to replicate. This is expected and safe to
ignore in local mode.


*Answer 3:*You can do something like
wordCounts.foreachRDD((rdd: RDD[...], time: Time) = {
   if (rdd.take(1).size == 1) {
  // There exists at least one element in RDD, so save it to file
  rdd.saveAsTextFile(generate file name based on time)
   }
}


TD



On Wed, Mar 26, 2014 at 11:08 AM, Diana Carroll dcarr...@cloudera.com
wrote:
 I'm trying to understand Spark streaming, hoping someone can help.

 I've kinda-sorta got a version of Word Count running, and it looks like
 this:

 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._

 object StreamingWordCount {

   def main(args: Array[String]) {
 if (args.length  3) {
   System.err.println(Usage: StreamingWordCount master hostname
 port)
   System.exit(1)
 }

 val master = args(0)
 val hostname = args(1)
 val port = args(2).toInt

 val ssc = new StreamingContext(master, Streaming Word
 Count,Seconds(2))
 val lines = ssc.socketTextStream(hostname, port)
 val words = lines.flatMap(line = line.split( ))
 val wordCounts = words.map(x = (x, 1)).reduceByKey((x,y) = x+y)
 wordCounts.print()
 ssc.start()
 ssc.awaitTermination()
  }
 }

 (I also have a small script that sends text to that port.)

 Question 1:
 When I run this, I don't get any output from the wordCounts.print as long
as
 my data is still streaming.  I have to stop my streaming data script
before
 my program will display the word counts.

 Why is that?  What if my stream is indefinite?  I thought the point of
 Streaming was that it would process it in real time?

 Question 2:
 While I run this (and the stream is still sending) I get continuous
warning
 messages like this:
 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already
 exists on this machine; not re-adding it
 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already
 exists on this machine; not re-adding it

 What does that mean?

 Question 3:
 I tried replacing the wordCounts.print() line with
 wordCounts.saveAsTextFiles(file:/my/path/outdir).
 This results in the creation of a new outdir-timestamp file being created
 every two seconds...even if there's no data during that time period.  Is
 there a way to tell it to save only if there's data?

 Thanks!


Re: rdd.saveAsTextFile problem

2014-03-26 Thread Tathagata Das
Can you give us the more detailed exception + stack trace in the log? It
should be in the driver log. If not, please take a look at the executor
logs, through the web ui to find the stack trace.

TD


On Tue, Mar 25, 2014 at 10:43 PM, gaganbm gagan.mis...@gmail.com wrote:

 Hi Folks,

 Is this issue resolved ? If yes, could you please throw some light on how
 to
 fix this ?

 I am facing the same problem during writing to text files.

 When I do

 stream.foreachRDD(rdd ={
 rdd.saveAsTextFile(Some path)
 })

 This works fine for me. But it creates multiple text files for each
 partition within an RDD.

 So I tried with coalesce option to merge my results in a single file for
 each RDD as :

 stream.foreachRDD(rdd ={
 rdd.coalesce(1,
 true).saveAsTextFile(Some path)
 })

 This fails with :
 org.apache.spark.SparkException: Job aborted: Task 75.0:0 failed 1 times
 (most recent failure: Exception failure: java.lang.IllegalStateException:
 unread block data)

 I am using Spark Streaming 0.9.0

 Any clue what's going wrong when using coalesce ?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/rdd-saveAsTextFile-problem-tp176p3238.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: spark executor/driver log files management

2014-03-25 Thread Tathagata Das
The logs from the executor are redirected to stdout only because there is a
default log4j.properties that is configured to do so. If you put your
log4j.properties with rolling file appender in the classpath (refer to
Spark docs for that), all the logs will get redirected to a separate files
that will get rolled over. So even though there will be a stdout, nothing
much would get printed in that. Though the side effect of this modification
is that you wont be able to see the logs from the Spark web UI as that only
shows stdout.




On Mon, Mar 24, 2014 at 10:48 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Hi TD,

 I thought about that but was not sure whether this will have any impact in
 spark UI/ Executor runner as it redirects stream to stderr/stdout. But
 ideally it should not as it will fetch the log record from stderr file
 (which is latest)..

 Is my understanding correct?

 Thanks,
 Sourav


 On Tue, Mar 25, 2014 at 3:26 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can use RollingFileAppenders in log4j.properties.

 http://logging.apache.org/log4j/extras/apidocs/org/apache/log4j/rolling/RollingFileAppender.html

 You can have other scripts delete old logs.

 TD


 On Mon, Mar 24, 2014 at 12:20 AM, Sourav Chandra 
 sourav.chan...@livestream.com wrote:

 Hi,

 I have few questions regarding log file management in spark:

 1. Currently I did not find any way to modify the lof file name for
 executor/drivers). Its hardcoded as stdout and stderr. Also there is no log
 rotation.

 In case of streaming application this will grow forever and become
 unmanageable. Is there any way to overcome this?

 Thanks,
 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com





 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com



Re: Spark Streaming ZeroMQ Java Example

2014-03-25 Thread Tathagata Das
Unfortunately there isnt one right now. But it is probably too hard to
start with the 
JavaNetworkWordCounthttps://github.com/apache/incubator-spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java,
and use the ZeroMQUtils in the same way as the Scala ZeroMQWordCount
example. Basically you have to change this
linehttps://github.com/apache/incubator-spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java#L65
and
create a zeromq stream rather than a socketStream. Refer to
ZeroMQUtilshttp://spark.apache.org/docs/latest/api/external/zeromq/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$.createStream
docs for more details.

Would be great if you can contribute it back ;)

TD




On Mon, Mar 24, 2014 at 11:28 PM, goofy real goofyrealt...@gmail.comwrote:

 Is there a ZeroMQWordCount Java sample code?


 https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala



Re: [bug?] streaming window unexpected behaviour

2014-03-25 Thread Tathagata Das
You can probably do it in a simpler but sort of hacky way!

If your window size is W and sliding interval S, you can do some math to
figure out how many of the first windows are actually partial windows. Its
probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can
increment a global counter to count how many RDDs have been generated and
ignore the first few RDDs.

windowDStream.foreachRDD(rdd = {
Global.counter += 1
if (Global.counter  math.ceil(W/S)) {
  return  // ignore
} else {
 // do something awesome
}
})


On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu amoc...@verticalscope.comwrote:

 Let me rephrase that,
 Do you think it is possible to use an accumulator to skip the first few
 incomplete RDDs?

 -Original Message-
 From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
 Sent: March-25-14 9:57 AM
 To: user@spark.apache.org
 Cc: u...@spark.incubator.apache.org
 Subject: RE: [bug?] streaming window unexpected behaviour

 Thanks TD!
 Is it possible to perhaps add another window method that doesn't not
 generate partial windows? Or, Is it possible to remove the first few
 partial windows? I'm thinking of using an accumulator to count how many
 windows there are.

 -A

 -Original Message-
 From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
 Sent: March-24-14 6:55 PM
 To: user@spark.apache.org
 Cc: u...@spark.incubator.apache.org
 Subject: Re: [bug?] streaming window unexpected behaviour

 Yes, I believe that is current behavior. Essentially, the first few RDDs
 will be partial windows (assuming window duration  sliding interval).

 TD


 On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu amoc...@verticalscope.com
 wrote:
  I have what I would call unexpected behaviour when using window on a
 stream.
 
  I have 2 windowed streams with a 5s batch interval. One window stream
  is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
 
  What I've noticed is that the 1st RDD produced by bigWindow is
  incorrect and is of the size 5s not 10s. So instead of waiting 10s and
  producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
 
  Why is this happening? To me it looks like a bug; Matei or TD can you
  verify that this is correct behaviour?
 
 
 
 
 
  I have the following code
 
  val ssc = new StreamingContext(conf, Seconds(5))
 
 
 
  val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
 
  val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
 
 
 
  val smallWindow = smallWindowReshapedStream.window(Seconds(5),
  Seconds(5))
 
.reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))
 
  val bigWindow = bigWindowReshapedStream.window(Seconds(10),
  Seconds(5))
 
  .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))
 
 
 
  -Adrian
 
 



Re: Sliding Window operations do not work as documented

2014-03-24 Thread Tathagata Das
Hello Sanjay,

Yes, your understanding of lazy semantics is correct. But ideally
every batch should read based on the batch interval provided in the
StreamingContext. Can you open a JIRA on this?

On Mon, Mar 24, 2014 at 7:45 AM, Sanjay Awatramani
sanjay_a...@yahoo.com wrote:
 Hi All,

 I found out why this problem exists. Consider the following scenario:
 - a DStream is created from any source. (I've checked with file and socket)
 - No actions are applied to this DStream
 - Sliding Window operation is applied to this DStream and an action is
 applied to the sliding window.
 In this case, Spark will not even read the input stream in the batch in
 which the sliding interval isn't a multiple of batch interval. Put another
 way, it won't read the input when it doesn't have to apply the window
 function. This is happening because all transformations in Spark are lazy.

 How to fix this or workaround it (see line#3):
 JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
 Duration(1 * 60 * 1000));
 JavaDStreamString inputStream = stcObj.textFileStream(/Input);
 inputStream.print(); // This is the workaround
 JavaDStreamString objWindow = inputStream.window(new
 Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
 objWindow.dstream().saveAsTextFiles(/Output, );


 The Window operations example on the streaming guide implies that Spark
 will read the stream in every batch, which is not happening because of the
 lazy transformations.
 Wherever sliding window would be used, in most of the cases, no actions will
 be taken on the pre-window batch, hence my gut feeling was that Streaming
 would read every batch if any actions are being taken in the windowed
 stream.

 Regards,
 Sanjay


 On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani sanjay_a...@yahoo.com
 wrote:
 Hi,

 I want to run a map/reduce process over last 5 seconds of data, every 4
 seconds. This is quite similar to the sliding window pictorial example under
 Window Operations section on
 http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html
 .

 The RDDs returned by window transformation function are incorrect in my
 case. To investigate this further, I ran a series of examples with varying
 values of window length  slide interval. Summary of the test results:
 (window length, slide interval) - result
 (3,1) - success
 (4,2) - success
 (3,2) - fail
 (4,3) - fail
 (5,4) - fail
 (5,2) - fail

 The only condition mentioned in the doc is that the two values(5  4) should
 be multiples of batch interval(1 in my case) and obviously, I get a run time
 error if I attempt to violate this condition. Looking at my results, it
 seems that failures result when the slide interval isn't a multiple of
 window length.

 My code:
 JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
 Duration(1 * 60 * 1000));
 JavaDStreamString inputStream = stcObj.textFileStream(/Input);
 JavaDStreamString objWindow = inputStream.window(new
 Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
 objWindow.dstream().saveAsTextFiles(/Output, );

 Detailed results:
 (3,1) - success
 @t_0: [inputStream's RDD@t_0]
 @t_1: [inputStream's RDD@t_0,1]
 @t_2: [inputStream's RDD@t_0,1,2]
 @t_3: [inputStream's RDD@t_1,2,3]
 @t_4: [inputStream's RDD@t_2,3,4]
 @t_5: [inputStream's RDD@t_3,4,5]

 (4,2) - success
 @t_0: nothing
 @t_1: [inputStream's RDD@t_0,1]
 @t_2: nothing
 @t_3: [inputStream's RDD@t_0,1,2,3]
 @t_4: nothing
 @t_5: [inputStream's RDD@t_2,3,4,5]

 (3,2) - fail
 @t_0: nothing
 @t_1: [inputStream's RDD@t_0,1]
 @t_2: nothing
 @t_3: [inputStream's RDD@t_2,3]//(expected RDD@t_1,2,3)
 @t_4: nothing
 @t_5: [inputStream's RDD@t_4,5]//(expected RDD@t_3,4,5)

 (4,3) - fail
 @t_0: nothing
 @t_1: nothing
 @t_2: [inputStream's RDD@t_0,1,2]
 @t_3: nothing
 @t_4: nothing
 @t_5: [inputStream's RDD@t_3,4,5]//(expected RDD@t_2,3,4,5)

 (5,4) - fail
 @t_0: nothing
 @t_1: nothing
 @t_2: nothing
 @t_3: [inputStream's RDD@t_0,1,2,3]
 @t_4: nothing
 @t_5: nothing
 @t_6: nothing
 @t_7: [inputStream's RDD@t_4,5,6,7]//(expected RDD@t_3,4,5,6,7)

 (5,2) - fail
 @t_0: nothing
 @t_1: [inputStream's RDD@t_0,1]
 @t_2: nothing
 @t_3: [inputStream's RDD@t_0,1,2,3]
 @t_4: nothing
 @t_5: [inputStream's RDD@t_2,3,4,5]//(expected RDD@t_1,2,3,4,5)
 @t_6: nothing
 @t_7: [inputStream's RDD@t_4,5,6,7]//(expected RDD@t_3,4,5,6,7)

 I have run all the above examples twice to be sure !
 I believe either my understanding of sliding window mechanism is incorrect
 or there is a problem in the sliding window mechanism.

 Regards,
 Sanjay




Re: [bug?] streaming window unexpected behaviour

2014-03-24 Thread Tathagata Das
Yes, I believe that is current behavior. Essentially, the first few
RDDs will be partial windows (assuming window duration  sliding
interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu
amoc...@verticalscope.com wrote:
 I have what I would call unexpected behaviour when using window on a stream.

 I have 2 windowed streams with a 5s batch interval. One window stream is
 (5s,5s)=smallWindow and the other (10s,5s)=bigWindow

 What I've noticed is that the 1st RDD produced by bigWindow is incorrect and
 is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD
 with size 10s, Spark produced the 1st 10s RDD of size 5s.

 Why is this happening? To me it looks like a bug; Matei or TD can you verify
 that this is correct behaviour?





 I have the following code

 val ssc = new StreamingContext(conf, Seconds(5))



 val smallWindowStream = ssc.queueStream(smallWindowRddQueue)

 val bigWindowStream = ssc.queueStream(bigWindowRddQueue)



 val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5))

   .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))

 val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5))

 .reduceByKey((t1, t2) = (t1._1, t1._2, t1._3 + t2._3))



 -Adrian




Re: Explain About Logs NetworkWordcount.scala

2014-03-07 Thread Tathagata Das
I am not sure how to debug this without any more information about the
source. Can you monitor on the receiver side that data is being accepted by
the receiver but not reported?

TD


On Wed, Mar 5, 2014 at 7:23 AM, eduardocalfaia e.costaalf...@unibs.itwrote:

 Hi TD,
 I have seen in the web UI the stage number that result has been zero and in
 the field GC Times there is nothing.
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n2306/CaptureStage.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Explain-About-Logs-NetworkWordcount-scala-tp1835p2306.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: NoSuchMethodError - Akka - Props

2014-03-06 Thread Tathagata Das
Are you launching your application using scala or java command? scala
command bring in a version of Akka that we have found to cause conflicts
with Spark's version for Akka. So its best to launch using Java.

TD



On Thu, Mar 6, 2014 at 3:45 PM, Deepak Nulu deepakn...@gmail.com wrote:

 I see the same error. I am trying a standalone example integrated into a
 Play
 Framework v2.2.2 application. The error occurs when I try to create a Spark
 Streaming Context. Compilation succeeds, so I am guessing it has to do with
 the version of Akka getting picked up at runtime.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-Akka-Props-tp2191p2375.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: NoSuchMethodError in KafkaReciever

2014-03-06 Thread Tathagata Das
I dont have a Eclipse setup so I am not sure what is going on here. I would
try to use maven in the command line with a pom to see if this compiles.
Also, try to cleanup your system maven cache. Who knows if it had pulled in
a wrong version of kafka 0.8 and using it all the time. Blowing away the
cache and clean compiling will make sure the right kafka will be loaded.

Hope this helps.

TD


On Sat, Mar 1, 2014 at 8:26 PM, venki-kratos ve...@thekratos.com wrote:

 I am trying to user code similar to following :
 public  JavaPairDStreamString, String openStream() {
 HashMapString, String kafkaParams = Maps.newHashMap();
 kafkaParams.put(ZK_CONNECT,kafkaConfig.getString(ZK_CONNECT));

 kafkaParams.put(CONSUMER_GRP_ID,kafkaConfig.getString(CONSUMER_GRP_ID));

 MapString,Integer topicMap = Maps.newHashMap();
 topicMap.put(kafkaConfig.getString(ZK_TOPIC),
 kafkaConfig.getInteger(CONSUMER_THREAD_COUNT, 1));
 JavaPairDStreamString, String inputStream =
 KafkaUtils.createStream(streamingContext,
 String.class, String.class,

 StringDecoder.class,

 StringDecoder.class,
 kafkaParams,
 topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
 return inputStream;
 }

 I have spark-streaming_2.10-0.9.0-incubating.jar and
 spark-streaming-kafka_2.10-0.9.0-incubating.jar
 in the classpath using POM and m2e in Eclipse. JVM version is set to 1.6

 I get the following error,

 14/03/02 09:29:15 INFO kafka.KafkaReceiver: Connected to localhost:2181
 14/03/02 09:29:15 ERROR kafka.KafkaReceiver: Error receiving data
 java.lang.NoSuchMethodException:
 java.lang.Object.init(kafka.utils.VerifiableProperties)
 at java.lang.Class.getConstructor0(Class.java:2763)
 at java.lang.Class.getConstructor(Class.java:1693)
 at

 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108)
 at

 org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126)
 at

 org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173)
 at

 org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169)
 at

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
 at

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at .

 This is similar to code in JavaKafkaStreamSuite.testKafkaStream. I find
 that
 the kafka jar - kafka_2.10-0.8.0 does have such a constructor.

 What is going wrong? Can someone help solve this mystery and help with my
 misery? Basically stuck for last 2 days - as I am a Java Guy and would like
 to develop downstream code in Java



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark streaming on ec2

2014-02-27 Thread Tathagata Das
Yes! Spark streaming programs are just like any spark program and so any
ec2 cluster setup using the spark-ec2 scripts can be used to run spark
streaming programs as well.



On Thu, Feb 27, 2014 at 10:11 AM, Aureliano Buendia buendia...@gmail.comwrote:

 Hi,

 Does the ec2 support for spark 0.9 also include spark streaming? If not,
 is there an equivalent?




Re: Spark streaming on ec2

2014-02-27 Thread Tathagata Das
Yes, the default spark EC2 cluster runs the standalone deploy mode. Since
Spark 0.9, the standalone deploy mode allows you to launch the driver app
within the cluster itself and automatically restart it if it fails. You can
read about launching your app inside the cluster
herehttp://spark.incubator.apache.org/docs/latest/spark-standalone.html#connecting-an-application-to-the-cluster.
Using this you can launch your streaming app as well.

TD


On Thu, Feb 27, 2014 at 5:35 PM, Aureliano Buendia buendia...@gmail.comwrote:

 How about spark stream app itself? Does the ec2 script also provide means
 for daemonizing and monitoring spark streaming apps which are supposed to
 run 24/7? If not, any suggestions for how to do this?


 On Thu, Feb 27, 2014 at 8:23 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Zookeeper is automatically set up in the cluster as Spark uses Zookeeper.
 However, you have to setup your own input source like Kafka or Flume.

 TD


 On Thu, Feb 27, 2014 at 10:32 AM, Aureliano Buendia buendia...@gmail.com
  wrote:




 On Thu, Feb 27, 2014 at 6:17 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Yes! Spark streaming programs are just like any spark program and so
 any ec2 cluster setup using the spark-ec2 scripts can be used to run spark
 streaming programs as well.


 Great. Does it come with any input source support as well? (Eg kafka
 requires setting up zookeeper).





 On Thu, Feb 27, 2014 at 10:11 AM, Aureliano Buendia 
 buendia...@gmail.com wrote:

 Hi,

 Does the ec2 support for spark 0.9 also include spark streaming? If
 not, is there an equivalent?








<    4   5   6   7   8   9