Re: jars are not loading from 1.3. those set via setJars to the SparkContext

2015-06-22 Thread Tathagata Das
Do you have Kafka producer in your classpath? If so how are adding that
library? Are you running on YARN, or Mesos or Standalone or local. These
details will be very useful.

On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com
wrote:

 I am using spark streaming. what i am trying to do is sending few messages
 to some kafka topic. where its failing.

 java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:264)
 at kafka.utils.Utils$.createObject(Utils.scala:438)
 at kafka.producer.Producer.init(Producer.scala:61)

 On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com
 wrote:

 I have been using the spark from the last 6 months with the version 1.2.0.

 I am trying to migrate to the 1.3.0 but the same problem i have written
 is not wokring.

 Its giving class not found error when i try to load some dependent jars
 from the main program.

 This use to work in 1.2.0 when set all the dependent jars array to the
 spark context but not working in 1.3.0


 Please help me how to resolve this.


 Thanks,
 Murthy Chelankuri





Re: Serial batching with Spark Streaming

2015-06-20 Thread Tathagata Das
No it does not. By default, only after all the retries etc related to batch
X is done, then batch X+1 will be started.

Yes, one RDD per batch per DStream. However, the RDD could be a union of
multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned
DStream).

TD

On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote:

 Thanks Tathagata!

 I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then.

 Does the default scheduler initiate the execution of the *batch X+1*
 after the *batch X* even if tasks for the* batch X *need to be *retried
 due to failures*? If not, please could you suggest workarounds and point
 me to the code?

 One more thing was not 100% clear to me from the documentation: Is there
 exactly *1 RDD* published *per a batch interval* in a DStream?



 On 19 June 2015 at 16:58, Tathagata Das t...@databricks.com wrote:

 I see what is the problem. You are adding sleep in the transform
 operation. The transform function is called at the time of preparing the
 Spark jobs for a batch. It should not be running any time consuming
 operation like a RDD action or a sleep. Since this operation needs to run
 every batch interval, doing blocking long running operation messes with the
 need to run every batch interval.

 I will try to make this clearer in the guide. I had not seen anyone do
 something like this before and therefore it did not occur to me that this
 could happen. As long as you dont do time consuming blocking operation in
 the transform function, the batches will be generated, scheduled and
 executed in serial order by default.

 On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia mici...@gmail.com
 wrote:

 Binh, thank you very much for your comment and code. Please could you
 outline an example use of your stream? I am a newbie to Spark. Thanks again!

 On 18 June 2015 at 14:29, Binh Nguyen Van binhn...@gmail.com wrote:

 I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could
 not get the serialized behavior by using default scheduler when there is
 failure and retry
 so I created a customized stream like this.

 class EachSeqRDD[T: ClassTag] (
 parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit
   ) extends DStream[Unit](parent.ssc) {

   override def slideDuration: Duration = parent.slideDuration

   override def dependencies: List[DStream[_]] = List(parent)

   override def compute(validTime: Time): Option[RDD[Unit]] = None

   override private[streaming] def generateJob(time: Time): Option[Job] = {
 val pendingJobs = ssc.scheduler.getPendingTimes().size
 logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time))
 // do not generate new RDD if there is pending job
 if (pendingJobs == 0) {
   parent.getOrCompute(time) match {
 case Some(rdd) = {
   val jobFunc = () = {
 ssc.sparkContext.setCallSite(creationSite)
 eachSeqFunc(rdd, time)
   }
   Some(new Job(time, jobFunc))
 }
 case None = None
   }
 }
 else {
   None
 }
   }
 }
 object DStreamEx {
   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
 def eachSeqRDD(func: (RDD[T], Time) = Unit) = {
   // because the DStream is reachable from the outer object here, and 
 because
   // DStreams can't be serialized with closures, we can't proactively 
 check
   // it for serializability and so we pass the optional false to 
 SparkContext.clean
   new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, 
 false)).register()
 }
   }
 }

 -Binh
 ​

 On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com
 wrote:

 Tathagata, thanks for your response. You are right! Everything seems
 to work as expected.

 Please could help me understand why the time for processing of all
 jobs for a batch is always less than 4 seconds?

 Please see my playground code below.

 The last modified time of the input (lines) RDD dump files seems to
 match the Thread.sleep delays (20s or 5s) in the transform operation
 or the batching interval (10s): 20s, 5s, 10s.

 However, neither the batch processing time in the Streaming tab nor
 the last modified time of the output (words) RDD dump files reflect
 the Thread.sleep delays.

 07:20   3240  001_lines_...
   07:21 117   001_words_...
 07:41   37224 002_lines_...
   07:43 252   002_words_...
 08:00   37728 003_lines_...
   08:02 504   003_words_...
 08:20   38952 004_lines_...
   08:22 756   004_words_...
 08:40   38664 005_lines_...
   08:42 999   005_words_...
 08:45   38160 006_lines_...
   08:47 1134  006_words_...
 08:50   9720  007_lines_...
   08:51 1260  007_words_...
 08:55   9864  008_lines_...
   08:56 1260  008_words_...
 09:00   10656 009_lines_...
   09:01 1395  009_words_...
 09:05   11664 010_lines_...
   09:06 1395  010_words_...
 09:11   10935 011_lines_...
   09:11 1521

Re: RE: Spark or Storm

2015-06-19 Thread Tathagata Das
I agree with Cody. Its pretty hard for any framework to provide in built
support for that since the semantics completely depends on what data store
you want to use it with. Providing interfaces does help a little, but even
with those interface, the user still has to do most of the heavy lifting;
the user has to understand what is actually going on AND implement all the
needed code to ensure unique ID, and the data are atomically updated,
according to the capability and APIs provided by the data store.

On Fri, Jun 19, 2015 at 7:45 AM, Cody Koeninger c...@koeninger.org wrote:


 http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics

 semantics of output operations section

 Is this really not clear?

 As for the general tone of why doesn't the framework do it for you... in
 my opinion, this is essential complexity for delivery semantics in a
 distributed system, not incidental complexity.  You need to actually
 understand and be responsible for what's going on, unless you're talking
 about very narrow use cases (i.e. outputting to a known datastore with
 known semantics and schema)

 On Fri, Jun 19, 2015 at 7:26 AM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My understanding for exactly once semantics is it is handled into the
 framework itself but it is not very clear from the documentation , I
 believe documentation needs to be updated with a simple example so that it
 is clear to the end user , This is very critical to decide when some one is
 evaluating the framework and does not have enough time to validate all the
 use cases but to relay on the documentation.

 Ashish

 On Fri, Jun 19, 2015 at 7:10 AM, bit1...@163.com bit1...@163.com wrote:


 I think your observation is correct, you have to take care of these
 replayed data at your end,eg,each message has a unique id or something else.

 I am using I think in the above sentense, because I am not sure and I
 also have a related question:
 I am wonderring how direct stream + kakfa is implemented when the Driver
 is down and restarted, will it always first replay the checkpointed failed
 batch or will it honor Kafka's offset reset policy(auto.offset.reset). If
 it honors the reset policy and it is set as smallest, then it is the at
 least once semantics;  if it set largest, then it will be at most once
 semantics?


 --
 bit1...@163.com


 *From:* Haopu Wang hw...@qilinsoft.com
 *Date:* 2015-06-19 18:47
 *To:* Enno Shioji eshi...@gmail.com; Tathagata Das
 t...@databricks.com
 *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org;
 bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs
 wrbri...@gmail.com; Ashish Soni asoni.le...@gmail.com; ayan guha
 guha.a...@gmail.com; user@spark.apache.org; Sateesh Kavuri
 sateesh.kav...@gmail.com; Spark Enthusiast sparkenthusi...@yahoo.in;
 Sabarish Sasidharan sabarish.sasidha...@manthan.com
 *Subject:* RE: RE: Spark or Storm

 My question is not directly related: about the exactly-once semantic,
 the document (copied below) said spark streaming gives exactly-once
 semantic, but actually from my test result, with check-point enabled, the
 application always re-process the files in last batch after gracefully
 restart.



 ==
 *Semantics of Received Data*

 Different input sources provide different guarantees, ranging from *at-least
 once* to *exactly once*. Read for more details.
 *With Files*

 If all of the input data is already present in a fault-tolerant files
 system like HDFS, Spark Streaming can always recover from any failure and
 process all the data. This gives *exactly-once* semantics, that all the
 data will be processed exactly once no matter what fails.




  --

 *From:* Enno Shioji [mailto:eshi...@gmail.com]
 *Sent:* Friday, June 19, 2015 5:29 PM
 *To:* Tathagata Das
 *Cc:* prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com;
 Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.org;
 Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan
 *Subject:* Re: RE: Spark or Storm



 Fair enough, on second thought, just saying that it should be idempotent
 is indeed more confusing.



 I guess the crux of the confusion comes from the fact that people tend
 to assume the work you described (store batch id and skip etc.) is handled
 by the framework, perhaps partly because Storm Trident does handle it (you
 just need to let Storm know if the output operation has succeeded or not,
 and it handles the batch id storing  skipping business). Whenever I
 explain people that one needs to do this additional work you described to
 get end-to-end exactly-once semantics, it usually takes a while to convince
 them. In my limited experience, they tend to interpret transactional in
 that sentence to mean that you just have to write to a transactional
 storage like ACID RDB. Pointing them to Semantics of output operations is
 usually sufficient though.



 Maybe others like

Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
Yes, please tell us what operation are you using.

TD

On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched back
 to the createStream API build of my app. Yes, for the record, this is with
 CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
 are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim









Re: Assigning number of workers in spark streaming

2015-06-19 Thread Tathagata Das
All the basic parameter applies to both client and cluster mode. The only
difference between client and cluster mode is that the driver will be run
in the cluster, and there are some *additional* parameters to configure
that. Other params are common. Isnt it clear from the docs?

On Fri, Jun 19, 2015 at 2:54 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Thanx  alot !  But in client mode  Can we assign number of workers/nodes
  as a flag parameter to the spark-Submit command .
 And by default how it will distribute the load across the nodes.

 # Run on a Spark Standalone cluster in client deploy mode
 ./bin/spark-submit \
   --class org.apache.spark.examples.SparkPi \
   --master spark://207.184.161.138:7077 \
   --executor-memory 20G \
   --total-executor-cores 100 \
   /path/to/examples.jar \
   1000



 On Sat, Jun 20, 2015 at 3:18 AM, Tathagata Das t...@databricks.com
 wrote:

 Depends on what cluster manager are you using. Its all pretty well
 documented in the online documentation.

 http://spark.apache.org/docs/latest/submitting-applications.html

 On Fri, Jun 19, 2015 at 2:29 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 Hey ,
 *[For Client Mode]*

 1- Is there any way to assign the number  of workers from a cluster
  should be used for particular application .

 2- If not then how spark scheduler   decides scheduling of diif
 applications inside one  full logic .

 say my logic have  {inputStream
 wordsplitter-wordcountstatistical analysis}

 then on how many workers it will be scheduled .

 --
 Thanks  Regards,
 Anshu Shukla
 SERC-IISC





 --
 Thanks  Regards,
 Anshu Shukla



Re: Serial batching with Spark Streaming

2015-06-19 Thread Tathagata Das
);

 final JavaDStreamString lines = context.union(
 context.receiverStream(new GeneratorReceiver()),
 ImmutableList.of(
 context.receiverStream(new GeneratorReceiver()),
 context.receiverStream(new GeneratorReceiver(;

 lines.print();

 final AccumulatorInteger lineRddIndex =
 context.sparkContext().accumulator(0);
 lines.foreachRDD( rdd - {
 lineRddIndex.add(1);
 final String prefix = /tmp/ + String.format(%03d,
 lineRddIndex.localValue()) + _lines_;
 try (final PrintStream out = new PrintStream(prefix +
 UUID.randomUUID())) {
 rdd.collect().forEach(s - out.println(s));
 }
 return null;
 });

 final JavaDStreamString words =
 lines.flatMap(x - Arrays.asList(x.split( )));
 final JavaPairDStreamString, Integer pairs =
 words.mapToPair(s - new Tuple2String, Integer(s, 1));
 final JavaPairDStreamString, Integer wordCounts =
 pairs.reduceByKey((i1, i2) - i1 + i2);

 final AccumulatorInteger sleep =
 context.sparkContext().accumulator(0);
 final JavaPairDStreamString, Integer wordCounts2 =
 JavaPairDStream.fromJavaDStream(
 wordCounts.transform( (rdd) - {
 sleep.add(1);
 Thread.sleep(sleep.localValue()  6 ? 2 : 5000);
 return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
 rdd.classTag());
 }));

 final Function2ListInteger, OptionalInteger,
 OptionalInteger updateFunction =
 (values, state) - {
 Integer newSum = state.or(0);
 for (final Integer value : values) {
 newSum += value;
 }
 return Optional.of(newSum);
 };

 final ListTuple2String, Integer tuples =
 ImmutableList.Tuple2String, Integer of();
 final JavaPairRDDString, Integer initialRDD =
 context.sparkContext().parallelizePairs(tuples);

 final JavaPairDStreamString, Integer wordCountsState =
 wordCounts2.updateStateByKey(
  updateFunction,
  new
 HashPartitioner(context.sparkContext().defaultParallelism()),
 initialRDD);

 wordCountsState.print();

 final AccumulatorInteger rddIndex =
 context.sparkContext().accumulator(0);
 wordCountsState.foreachRDD( rdd - {
 rddIndex.add(1);
 final String prefix = /tmp/ + String.format(%03d,
 rddIndex.localValue()) + _words_;
 try (final PrintStream out = new PrintStream(prefix +
 UUID.randomUUID())) {
 rdd.collect().forEach(s - out.println(s));
 }
 return null;
 });

 context.start();
 context.awaitTermination();
 }


 On 17 June 2015 at 17:25, Tathagata Das t...@databricks.com wrote:
  The default behavior should be that batch X + 1 starts processing only
 after
  batch X completes. If you are using Spark 1.4.0, could you show us a
  screenshot of the streaming tab, especially the list of batches? And
 could
  you also tell us if you are setting any SparkConf configurations?
 
  On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia mici...@gmail.com
 wrote:
 
  Is it possible to achieve serial batching with Spark Streaming?
 
  Example:
 
  I configure the Streaming Context for creating a batch every 3
 seconds.
 
  Processing of the batch #2 takes longer than 3 seconds and creates a
  backlog of batches:
 
  batch #1 takes 2s
  batch #2 takes 10s
  batch #3 takes 2s
  batch #4 takes 2s
 
  Whet testing locally, it seems that processing of multiple batches is
  finished at the same time:
 
  batch #1 finished at 2s
  batch #2 finished at 12s
  batch #3 finished at 12s (processed in parallel)
  batch #4 finished at 15s
 
  How can I delay processing of the next batch, so that is processed
  only after processing of the previous batch has been completed?
 
  batch #1 finished at 2s
  batch #2 finished at 12s
  batch #3 finished at 14s (processed serially)
  batch #4 finished at 16s
 
  I want to perform a transformation for every key only once in a given
  period of time (e.g. batch duration). I find all unique keys in a
  batch and perform the transformation on each key. To ensure that the
  transformation is done for every key only once, only one batch can be
  processed at a time. At the same time, I want that single batch to be
  processed in parallel.
 
  context = new JavaStreamingContext(conf, Durations.seconds(10));
  stream = context.receiverStream(...);
  stream
  .reduceByKey(...)
  .transform(...)
  .foreachRDD(output);
 
  Any ideas or pointers are very welcome.
 
  Thanks!
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Assigning number of workers in spark streaming

2015-06-19 Thread Tathagata Das
Depends on what cluster manager are you using. Its all pretty well
documented in the online documentation.

http://spark.apache.org/docs/latest/submitting-applications.html

On Fri, Jun 19, 2015 at 2:29 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Hey ,
 *[For Client Mode]*

 1- Is there any way to assign the number  of workers from a cluster
  should be used for particular application .

 2- If not then how spark scheduler   decides scheduling of diif
 applications inside one  full logic .

 say my logic have  {inputStream
 wordsplitter-wordcountstatistical analysis}

 then on how many workers it will be scheduled .

 --
 Thanks  Regards,
 Anshu Shukla
 SERC-IISC



Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
I dont think there was any enhancments that can change this behavior.

On Fri, Jun 19, 2015 at 6:16 PM, Tim Smith secs...@gmail.com wrote:

 On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


 Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...


 Yes, sorry, the earlier/stable version was more like:
 kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
 being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 Thanks,

 Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this 
 is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com
 wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first thing 
 I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim














Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
Also, can you find from the spark UI the break up of the stages in each
batch's jobs, and find which stage is taking more time after a while?



On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...

 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed
 in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I 
 noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim












Re: RE: Spark or Storm

2015-06-19 Thread Tathagata Das
If the current documentation is confusing, we can definitely improve the
documentation. However, I dont not understand why is the term
transactional confusing. If your output operation has to add 5, then the
user has to implement the following mechanism

1. If the unique id of the batch of data is already present in the store,
then skip the update
2. Otherwise atomically do both, the update operation as well as store the
unique id of the batch. This is pretty much the definition of a
transaction. The user has to be aware of the transactional semantics of the
data store while implementing this functionality.

You CAN argue that this effective makes the whole updating sort-a
idempotent, as even if you try doing it multiple times, it will update only
once. But that is not what is generally considered as idempotent. Writing a
fixed count, not an increment, is usually what is called idempotent. And so
just mentioning that the output operation must be idempotent is, in my
opinion, more confusing.

To take a page out of the Storm / Trident guide, even they call this exact
conditional updating of Trident State as transactional operation. See
transactional spout in the Trident State guide -
https://storm.apache.org/documentation/Trident-state

In the end, I am totally open the suggestions and PRs on how to make the
programming guide easier to understand. :)

TD

On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji eshi...@gmail.com wrote:

 Tbh I find the doc around this a bit confusing. If it says end-to-end
 exactly-once semantics (if your updates to downstream systems are
 idempotent or transactional), I think most people will interpret it that
 as long as you use a storage which has atomicity (like MySQL/Postgres
 etc.), a successful output operation for a given batch (let's say + 5) is
 going to be issued exactly-once against the storage.

 However, as I understand it that's not what this statement means. What it
 is saying is, it will always issue +5 and never, say +6, because it
 makes sure a message is processed exactly-once internally. However, it
 *may* issue +5 more than once for a given batch, and it is up to the
 developer to deal with this by either making the output operation
 idempotent (e.g. set 5), or transactional (e.g. keep track of batch IDs
 and skip already applied batches etc.).

 I wonder if it makes more sense to drop or transactional from the
 statement, because if you think about it, ultimately what you are asked to
 do is to make the writes idempotent even with the transactional approach,
  transactional is a bit loaded and would be prone to lead to
 misunderstandings (even though in fairness, if you read the fault tolerance
 chapter it explicitly explains it).



 On Fri, Jun 19, 2015 at 2:56 AM, prajod.vettiyat...@wipro.com wrote:

  More details on the Direct API of Spark 1.3 is at the databricks blog:
 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html



 Note the use of checkpoints to persist the Kafka offsets in Spark
 Streaming itself, and not in zookeeper.



 Also this statement:”.. This allows one to build a Spark Streaming +
 Kafka pipelines with end-to-end exactly-once semantics (if your updates to
 downstream systems are idempotent or transactional).”





 *From:* Cody Koeninger [mailto:c...@koeninger.org]
 *Sent:* 18 June 2015 19:38
 *To:* bit1...@163.com
 *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com;
 eshi...@gmail.com; wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha;
 user; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in;
 sabarish.sasidha...@manthan.com
 *Subject:* Re: RE: Spark or Storm



 That general description is accurate, but not really a specific issue of
 the direct steam.  It applies to anything consuming from kafka (or, as
 Matei already said, any streaming system really).  You can't have exactly
 once semantics, unless you know something more about how you're storing
 results.



 For some unique id, topicpartition and offset is usually the obvious
 choice, which is why it's important that the direct stream gives you access
 to the offsets.



 See https://github.com/koeninger/kafka-exactly-once for more info







 On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com bit1...@163.com wrote:

  I am wondering how direct stream api ensures end-to-end exactly once
 semantics



 I think there are two things involved:

 1. From the spark streaming end, the driver will replay the Offset range
 when it's down and restarted,which means that the new tasks will process
 some already processed data.

 2. From the user end, since tasks may process already processed data,
 user end should detect that some data has already been processed,eg,

 use some unique ID.



 Not sure if I have understood correctly.




  --

 bit1...@163.com



 *From:* prajod.vettiyat...@wipro.com

 *Date:* 2015-06-18 16:56

 *To:* jrpi...@gmail.com; eshi...@gmail.com

 *CC:* wrbri...@gmail.com; 

Re: understanding on the waiting batches and scheduling delay in Streaming UI

2015-06-18 Thread Tathagata Das
Also, could you give a screenshot of the streaming UI. Even better, could
you run it on Spark 1.4 which has a new streaming UI and then use that for
debugging/screenshot?

TD

On Thu, Jun 18, 2015 at 3:05 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Which version of spark? and what is your data source? For some reason,
 your processing delay is exceeding the batch duration. And its strange that
 you are not seeing any scheduling delay.

 Thanks
 Best Regards

 On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.com wrote:

 Hi,



 I have a spark streaming program running for ~ 25hrs. When I check the
 Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling
 delay” is 0. I am a bit confused.

 If the “waiting batches” is 144, that means many batches are waiting in
 the queue to be processed? If this is the case, the scheduling delay should
 be high rather than 0. Am I missing anything?



 Thanks,

 Mike





Re: Latency between the RDD in Streaming

2015-06-18 Thread Tathagata Das
Why do you need to uniquely identify the message? All you need is the time
when the message was inserted by the receiver, and when it is processed,
isnt it?


On Thu, Jun 18, 2015 at 2:28 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Thanks alot , But i have already  tried the  second way ,Problem with that
 is that how to  identify the particular RDD from source to sink (as we can
 do by passing a msg id in storm) . For that i just  updated RDD  and added
 a msgID (as static variable) . but while dumping them to file some of the
 tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500
 tuples/sec).

 On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com
 wrote:

 Couple of ways.

 1. Easy but approx way: Find scheduling delay and processing time using
 StreamingListener interface, and then calculate end-to-end delay = 0.5 *
 batch interval + scheduling delay + processing time. The 0.5 * batch
 inteval is the approx average batching delay across all the records in the
 batch.

 2. Hard but precise way: You could build a custom receiver that embeds
 the current timestamp in the records, and then compare them with the
 timestamp at the final step of the records. Assuming the executor and
 driver clocks are reasonably in sync, this will measure the latency between
 the time is received by the system and the result from the record is
 available.

 On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 Sorry , i missed  the LATENCY word.. for a large  streaming query .How
 to find the time taken by the  particular  RDD  to travel from  initial
 D-STREAM to  final/last  D-STREAM .
 Help Please !!

 On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com
 wrote:

 Its not clear what you are asking. Find what among RDD?

 On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
 wrote:

 Is there any  fixed way to find  among RDD in stream processing
 systems , in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla



Re: createDirectStream and Stats

2015-06-18 Thread Tathagata Das
Are you using Spark 1.3.x ? That explains. This issue has been fixed in
Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
stats. :)

On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim





Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-18 Thread Tathagata Das
Glad to hear that. :)

On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 We switched from ParallelGC to CMS, and the symptom is gone.

 On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
 setting can be seen in web ui's environment tab. But, it still eats memory,
 i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


 On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off
 the off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a
 script), take snapshots of histogram and also do memory dumps. Say every
 hour. And then compare the difference between two histo/dumps that are few
 hours separated (more the better). Diffing histo is easy. Diff two dumps
 can be done in JVisualVM, it will show the diff in the objects that got
 added in the later dump. That makes it easy to debug what is not getting
 cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory
 is consumed by some off-heap data. I can't find a way to figure out what 
 is
 in there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the
 logs, level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN
 node and get histograms of live objects using jmap -histo:live. That 
 may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 logs.filter

Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD

2015-06-18 Thread Tathagata Das
I think you may be including a different version of Spark Streaming in your
assembly. Please mark spark-core nd spark-streaming as provided
dependencies. Any installation of Spark will automatically provide Spark in
the classpath so you do not have to bundle it.

On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com
wrote:

 Hi,

 I have the following piece of code, where I am trying to transform a spark
 stream and add min and max to it of eachRDD. However, I get an error saying
 max call does not exist, at run-time (compiles properly). I am using
 spark-1.4

 I have added the question to stackoverflow as well:
 http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796

 Any help is greatly appreciated :)

 Thanks
 Nipun

 JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long 
 sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());

 sortedtsStream.foreach(
 new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, 
 Long, Void() {
 @Override
 public Void call(JavaPairRDDTuple2Long, Integer, 
 Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception {
 ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long 
 templist = tuple2Tuple3JavaPairRDD.collect();
 for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long 
 tuple :templist){

 Date date = new Date(tuple._1._1);
 int pattern = tuple._1._2;
 int count = tuple._2._1();
 Date maxDate = new Date(tuple._2._2());
 Date minDate = new Date(tuple._2._2());
 System.out.println(TimeSlot:  + date.toString() +  
 Pattern:  + pattern +  Count:  + count +  Max:  + maxDate.toString() +  
 Min:  + minDate.toString());

 }
 return null;
 }
 }
 );

 Error:


 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in 
 memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 
 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread 
 JobGenerator java.lang.NoSuchMethodError: 
 org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
 at 
 org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
 at 
 org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
 at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf




Re: Latency between the RDD in Streaming

2015-06-18 Thread Tathagata Das
Its not clear what you are asking. Find what among RDD?

On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
wrote:

 Is there any  fixed way to find  among RDD in stream processing systems ,
 in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla



Re: Latency between the RDD in Streaming

2015-06-18 Thread Tathagata Das
Couple of ways.

1. Easy but approx way: Find scheduling delay and processing time using
StreamingListener interface, and then calculate end-to-end delay = 0.5 *
batch interval + scheduling delay + processing time. The 0.5 * batch
inteval is the approx average batching delay across all the records in the
batch.

2. Hard but precise way: You could build a custom receiver that embeds the
current timestamp in the records, and then compare them with the timestamp
at the final step of the records. Assuming the executor and driver clocks
are reasonably in sync, this will measure the latency between the time is
received by the system and the result from the record is available.

On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Sorry , i missed  the LATENCY word.. for a large  streaming query .How to
 find the time taken by the  particular  RDD  to travel from  initial
 D-STREAM to  final/last  D-STREAM .
 Help Please !!

 On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com
 wrote:

 Its not clear what you are asking. Find what among RDD?

 On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
 wrote:

 Is there any  fixed way to find  among RDD in stream processing systems
 , in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla



Re: Serial batching with Spark Streaming

2015-06-17 Thread Tathagata Das
The default behavior should be that batch X + 1 starts processing only
after batch X completes. If you are using Spark 1.4.0, could you show us a
screenshot of the streaming tab, especially the list of batches? And could
you also tell us if you are setting any SparkConf configurations?

On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia mici...@gmail.com wrote:

 Is it possible to achieve serial batching with Spark Streaming?

 Example:

 I configure the Streaming Context for creating a batch every 3 seconds.

 Processing of the batch #2 takes longer than 3 seconds and creates a
 backlog of batches:

 batch #1 takes 2s
 batch #2 takes 10s
 batch #3 takes 2s
 batch #4 takes 2s

 Whet testing locally, it seems that processing of multiple batches is
 finished at the same time:

 batch #1 finished at 2s
 batch #2 finished at 12s
 batch #3 finished at 12s (processed in parallel)
 batch #4 finished at 15s

 How can I delay processing of the next batch, so that is processed
 only after processing of the previous batch has been completed?

 batch #1 finished at 2s
 batch #2 finished at 12s
 batch #3 finished at 14s (processed serially)
 batch #4 finished at 16s

 I want to perform a transformation for every key only once in a given
 period of time (e.g. batch duration). I find all unique keys in a
 batch and perform the transformation on each key. To ensure that the
 transformation is done for every key only once, only one batch can be
 processed at a time. At the same time, I want that single batch to be
 processed in parallel.

 context = new JavaStreamingContext(conf, Durations.seconds(10));
 stream = context.receiverStream(...);
 stream
 .reduceByKey(...)
 .transform(...)
 .foreachRDD(output);

 Any ideas or pointers are very welcome.

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark or Storm

2015-06-17 Thread Tathagata Das
To add more information beyond what Matei said and answer the original
question, here are other things to consider when comparing between Spark
Streaming and Storm.

* Unified programming model and semantics - Most occasions you have to
process the same data again in batch jobs. If you have two separate systems
for batch and streaming, its much much harder to share the code. You will
have to deal with different processing models, with their own semantics.
Compare Storm's join vs doing an usual batch join, where as Spark and
Spark Streaming share the same join semantics as they are based on same RDD
model underneath.

* Integration with Spark ecosystem - Many people really want to go beyond
basic streaming ETL and into advanced streaming analytics.
  - Combine stream processing with static datasets
  - Apply dynamically updated machine learning models (i.e. offline
learning and online prediction, or even continuous learning and
prediction),
  - Apply DataFrame and SQL operation with streaming
 These things are pretty easy to do with the spark ecosystem

* Operational management - You have to consider the operational cost of
managing two separate systems for batch and stream processing (with their
own deployment models), vs managing one single engine with one deployment
model.

* Performance - According to Intel's independent study, Spark Streaming in
Kafka direct mode can have 2.5-3x throughput than Trident with 0.5GB batch
size. And at that batch size, the latency of Trident is 30 seconds,
compared to 1.5 seconds for Spark Streaming. This is from a talk that Intel
gave in the Spark Summit (https://spark-summit.org/2015/) two days ago.
Slides will be available soon, but here is a sneak peek - throughput -
http://i.imgur.com/u6pf4rB.png   and latency - http://imgur.com/c46MJ4i
I will post the link to the slides when it comes out, hopefully next week.



On Wed, Jun 17, 2015 at 11:55 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 The major difference is that in Spark Streaming, there's no *need* for a
 TridentState for state inside your computation. All the stateful operations
 (reduceByWindow, updateStateByKey, etc) automatically handle exactly-once
 processing, keeping updates in order, etc. Also, you don't need to run a
 separate transactional system (e.g. MySQL) to store the state.

 After your computation runs, if you want to write the final results (e.g.
 the counts you've been tracking) to a storage system, you use one of the
 output operations (saveAsFiles, foreach, etc). Those actually will run in
 order, but some might run multiple times if nodes fail, thus attempting to
 write the same state again.

 You can read about how it works in this research paper:
 http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf.

 Matei

 On Jun 17, 2015, at 11:49 AM, Enno Shioji eshi...@gmail.com wrote:

 Hi Matei,


 Ah, can't get more accurate than from the horse's mouth... If you don't
 mind helping me understand it correctly..

 From what I understand, Storm Trident does the following (when used with
 Kafka):
 1) Sit on Kafka Spout and create batches
 2) Assign global sequential ID to the batches
 3) Make sure that all result of processed batches are written once to
 TridentState, *in order* (for example, by skipping batches that were
 already applied once, ultimately by using Zookeeper)

 TridentState is an interface that you have to implement, and the
 underlying storage has to be transactional for this to work. The necessary
 skipping etc. is handled by Storm.

 In case of Spark Streaming, I understand that
 1) There is no global ordering; e.g. an output operation for batch
 consisting of offset [4,5,6] can be invoked before the operation for offset
 [1,2,3]
 2) If you wanted to achieve something similar to what TridentState does,
 you'll have to do it yourself (for example using Zookeeper)

 Is this a correct understanding?




 On Wed, Jun 17, 2015 at 7:14 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 This documentation is only for writes to an external system, but all the
 counting you do within your streaming app (e.g. if you use
 reduceByKeyAndWindow to keep track of a running count) is exactly-once.
 When you write to a storage system, no matter which streaming framework you
 use, you'll have to make sure the writes are idempotent, because the
 storage system can't know whether you meant to write the same data again or
 not. But the place where Spark Streaming helps over Storm, etc is for
 tracking state within your computation. Without that facility, you'd not
 only have to make sure that writes are idempotent, but you'd have to make
 sure that updates to your own internal state (e.g. reduceByKeyAndWindow)
 are exactly-once too.

 Matei


 On Jun 17, 2015, at 8:26 AM, Enno Shioji eshi...@gmail.com wrote:

 The thing is, even with that improvement, you still have to make updates
 idempotent or transactional yourself. If you read
 

Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Tathagata Das
Is it a lot of data that is expected to come through stdin? I mean is it
even worth parallelizing the computation using something like Spark
Streaming?

On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote:

  Thanks for your reply! In my use case, it would be stream from only one
 stdin. Also I'm working with Scala.
 It would be great if you could talk about multi stdin case as well!
 Thanks.

   From: Tathagata Das t...@databricks.com
 Date: Thursday, June 11, 2015 at 8:11 PM
 To: Heath Guo heath...@fb.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark Streaming reads from stdin or output from command line
 utility

   Are you going to receive data from one stdin from one machine, or many
 stdins on many machines?


 On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote:

 Hi, I'm new to Spark Streaming, and I want to create a application where
 Spark Streaming could create DStream from stdin. Basically I have a
 command
 line utility that generates stream data, and I'd like to pipe data into
 DStream. What's the best way to do that? I thought rdd.pipe() could help,
 but it seems that requires an rdd in the first place, which does not
 apply.
 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
 https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?

2015-06-12 Thread Tathagata Das
BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you
dont need to create the singleton yourself.

On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio 
drarse.a...@gmail.com wrote:

 Note: CCing user@spark.apache.org


 First, you must check if the RDD is empty:

  messages.foreachRDD { rdd =
  if (!rdd.isEmpty) { }}

 Now, you can obtain the instance of a SQLContext:

 val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)




 *Optional*
 In this moment, I like work with DataFrame. I convert RDD to DataFrame. I
 see that you recive a JSON:

 val df :DataFrame = sqlContext.jsonRDD(message,
 getSchema(getSchemaStr)).toDF()


 My getSchema function create a Schema of my JSON:

 def getSchemaStr() :String = feature1 feature2 ...

 def getSchema(schema: String) :StructType = StructType (schema.split(
 ).map(fieldName = StructField(fieldName, StringType, true)))

 I hope you helps.

 Regards.



 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] 
 ml-node+s1001560n23226...@n3.nabble.com:

 I don't know why, you said “Why? I tried this solution and works fine.”
 means your SQLContext instance alive all the streaming application’s life
 time, rather than one bath duration ? My code as below:


 object SQLContextSingleton extends java.io.Serializable{
   @transient private var instance: SQLContext = null

   // Instantiate SQLContext on demand
   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
 if (instance == null) {
   instance = new SQLContext(sparkContext)
 }
 instance
   }
 }

 // type_-typex, id_-id, url_-url
 case class  (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends 
 Serializable
 case class Count(x: Int)

 @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
 ssc.checkpoint(.)

 val kafkaParams = Map(metadata.broker.list - 10.20.30.40:9092,)
 @transient val dstream = KafkaUtils.createDirectStream[String, String, 
 StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name))
 @transient val dddstream= newsIdDStream.map(x = x._2).flatMap(x = 
 x.split(\n))

 dddstream.foreachRDD { rdd =
 
 SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable(ttable)
 val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql(SELECT 
 COUNT(*) FROM ttable)
 ret.foreach{ x = println(x(0)) }
 }

 ssc.start()
 ssc.awaitTermination()






 在 2015-06-09 17:41:44,drarse [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23226i=0 写道:

 Why? I  tried  this solution and works fine.

 El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] 
 [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23219i=0 escribió:

 Hi drarse, thanks for replying, the way you said use a singleton object
 does not work




 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道:

 The best way is create a singleton object like:

 object SQLContextSingleton {
   @transient private var instance: SQLContext = null

   // Instantiate SQLContext on demand
   def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
 if (instance == null) {
   instance = new SQLContext(sparkContext)
 }
 instance
   }}

  You have more information in the programming guide:

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations



 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=23216i=0:

 I used SQLContext in a spark streaming application as blew:

 

 case class topic_name (f1: Int, f2: Int)

 val sqlContext = new SQLContext(sc)
 @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
 ssc.checkpoint(.)
 val theDStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name))

 theDStream.map(x = x._2).foreach { rdd =
   sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name)
   sqlContext.sql(select count(*) from topic_name).foreach { x =
 WriteToFile(file_path, x(0).toString)
   }
 }

 ssc.start()
 ssc.awaitTermination()
 


 I found i could only get every 5 seconds's count of message, because
 The lifetime of this temporary table is tied to the SQLContext that was
 used to create this DataFrame, i guess every 5 seconds, a new sqlContext
 will be create and the temporary table can only alive just 5 seconds, i
 want to the sqlContext and the temporary table alive all the streaming
 application's life cycle, how to do it?

 Thanks~

 --
  If you reply to 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Tathagata Das
Let me try to add some clarity in the different thought directions that's
going on in this thread.

1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES?

If there are not rate limits set up, the most reliable way to detect
whether the current Spark cluster is being insufficient to handle the data
load is to use the StreamingListner interface which gives all the
information about when batches start and end. See the internal
implementation of the StreamingListener called
StreamingJobProgressListener. This is the one that drives the streaming UI.
You can get the scheduling delay (time take for a batch to start
processing) from it and use that as a reliable indicator that Spark
Streaming is not able to process as fast as data is being received.

But if you have already set rate limits based on the max load that cluster
can handle, then you will probably never detect that the actual input rate
into Kafka has gone up and data is getting buffered inside Kafka. In that
case, you have to monitor kafka load to correctly detect the high load. You
may to use a combination of both techniques for robust and safe elastic
solution -  Have rate limits set, use StreamingListener for early detect
that processing load is increasing (can increase without actual increase in
data rate) and also make sure from Kafka monitoring that the whole
end-to-end system is keeping up.


2. HOW TO GET MORE CLUSTER RESOURCES?

Currently for YARN, you can use the developer API of dynamic allocation
that Andrew Or has introduced to ask for more executors from YARN. Note
that the existing dynamic allocation solution is unlikely to work for
streaming, and should not be used. Rather I recommend building your own
logic that sees the streaming scheduling delay, and accordingly uses the
low level developer API to directly ask for more executors
(sparkContext.requestExecutors). In other approaches, the Provising
Component idea can also work.


3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES?

There are two approaches depending on receiver vs Kafka direct. I am
assuming the number of topic partitions pre-determined to be large enough
to handle peak load.

(a) Kafka Direct: This is the simpler scenario.  Since there are no
receivers, if the cluster gets a new executor, it will automatically start
getting used to run tasks, including reading from Kafka (remember, Kafka
direct approach reads from Kafka like a file system, from any node that
runs the task). So it will immediately start using the extra resources, no
need to do anything further.

(b) Receiver: This is definitely tricky. If you dont need to increase the
number of receivers, then a new executor will start getting used for
computations (shuffles, writing out, etc.), but the parallelism in
receiving will not increase. If you need to increase that, then its best to
shutdown the context gracefully (so that no data is lost), and a new
StreamingContext can be started with more receivers (# receivers = #
executors), and may be more #partitions for shuffles. You have call stop on
currently running streaming context, to start a new one. If a context is
stopped, any thread stuck in awaitTermniation will get unblocked.

Does that clarify things?







On Thu, Jun 11, 2015 at 7:30 AM, Cody Koeninger c...@koeninger.org wrote:

 Depends on what you're reusing multiple times (if anything).

 Read
 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

 On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 At which point would I call cache()?  I just want the runtime to spill to
 disk when necessary without me having to know when the necessary is.


 On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 direct stream isn't a receiver, it isn't required to cache data anywhere
 unless you want it to.

 If you want it, just call cache.

 On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 set the storage policy for the DStream RDDs to MEMORY AND DISK - it
 appears the storage level can be specified in the createStream methods but
 not createDirectStream...


 On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory 
 will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark 

Re: Spark Streaming reads from stdin or output from command line utility

2015-06-11 Thread Tathagata Das
Are you going to receive data from one stdin from one machine, or many
stdins on many machines?


On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote:

 Hi, I'm new to Spark Streaming, and I want to create a application where
 Spark Streaming could create DStream from stdin. Basically I have a command
 line utility that generates stream data, and I'd like to pipe data into
 DStream. What's the best way to do that? I thought rdd.pipe() could help,
 but it seems that requires an rdd in the first place, which does not apply.
 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Shutdown with streaming driver running in cluster broke master web UI permanently

2015-06-11 Thread Tathagata Das
Do you have the event logging enabled?

TD

On Thu, Jun 11, 2015 at 11:24 AM, scar0909 scar0...@gmail.com wrote:

 I have the same problem. i realized that the master spark becomes
 unresponsive when we kill the leader zookeeper (of course i assigned the
 leader election task to the zookeeper). please let me know if you have any
 devlepments.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shutdown-with-streaming-driver-running-in-cluster-broke-master-web-UI-permanently-tp4149p23284.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Join between DStream and Periodically-Changing-RDD

2015-06-11 Thread Tathagata Das
Another approach not mentioned is to use a function to get the RDD that is
to be joined. Something like this.


Not sure, but you can try something like this also:



kvDstream.foreachRDD(rdd = {



  val rdd = getOrUpdateRDD(params...)



  rdd.join(kvFile)





})

The getOrUpdateRDD() function that you implement will get called every
batch interval. And you can decide to return the same RDD or an updated RDD
when you want to. Once updated, if the RDD is going to be used in multiple
batch intervals, you should cache it. Furthermore, if you are going to join
it, you should partition it by a partitioner, then cached it and make sure
that the same partitioner is used for joining. That would be more
efficient, as the RDD will stay partitioned in memory, minimizing the cost
of join.


On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 It depends on how big the Batch RDD requiring reloading is



 Reloading it for EVERY single DStream RDD would slow down the stream
 processing inline with the total time required to reload the Batch RDD …..



 But if the Batch RDD is not that big then that might not be an issues
 especially in the context of the latency requirements for your streaming app



 Another more efficient and real-time approach may be to represent your
 Batch RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of
 the spark streaming app instance and keep joining with the actual Dstream
 RDDs



 You can feed your HDFS file into a Message Broker topic and consume it
 from there in the form of DStream RDDs which you keep aggregating over the
 lifetime of the spark streaming app instance



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Wednesday, June 10, 2015 8:36 AM
 *To:* Ilove Data
 *Cc:* user@spark.apache.org
 *Subject:* Re: Join between DStream and Periodically-Changing-RDD



 RDD's are immutable, why not join two DStreams?



 Not sure, but you can try something like this also:



 kvDstream.foreachRDD(rdd = {



   val file = ssc.sparkContext.textFile(/sigmoid/)

   val kvFile = file.map(x = (x.split(,)(0), x))



   rdd.join(kvFile)





 })




 Thanks

 Best Regards



 On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote:

 Hi,



 I'm trying to join DStream with interval let say 20s, join with RDD loaded
 from HDFS folder which is changing periodically, let say new file is coming
 to the folder for every 10 minutes.



 How should it be done, considering the HDFS files in the folder is
 periodically changing/adding new files? Do RDD automatically detect changes
 in HDFS folder as RDD source and automatically reload RDD?



 Thanks!

 Rendy





Re: Spark SQL and Streaming Results

2015-06-05 Thread Tathagata Das
You could take at RDD *async operations, their source code. May be that can
help if getting some early results.

TD

On Fri, Jun 5, 2015 at 8:41 AM, Pietro Gentile 
pietro.gentile89.develo...@gmail.com wrote:

 Hi all,


 what is the best way to perform Spark SQL queries and obtain the result
 tuplas in a stremaing way. In particullar, I want to aggregate data and
 obtain the first and incomplete results in a fast way. But it should be
 updated until the aggregation be completed.

 Best Regards.



Re: Spark SQL and Streaming Results

2015-06-05 Thread Tathagata Das
I am not sure. Saisai may be able to say more about it.

TD

On Fri, Jun 5, 2015 at 5:35 PM, Todd Nist tsind...@gmail.com wrote:

 There use to be a project, StreamSQL (
 https://github.com/thunderain-project/StreamSQL), but it appears a bit
 dated and I do not see it in the Spark repo, but may have missed it.

 @TD Is this project still active?

 I'm not sure what the status is but it may provide some insights on how to
 achieve what your looking to do.

 On Fri, Jun 5, 2015 at 6:34 PM, Tathagata Das t...@databricks.com wrote:

 You could take at RDD *async operations, their source code. May be that
 can help if getting some early results.

 TD

 On Fri, Jun 5, 2015 at 8:41 AM, Pietro Gentile 
 pietro.gentile89.develo...@gmail.com wrote:

 Hi all,


 what is the best way to perform Spark SQL queries and obtain the result
 tuplas in a stremaing way. In particullar, I want to aggregate data and
 obtain the first and incomplete results in a fast way. But it should be
 updated until the aggregation be completed.

 Best Regards.






Re: Roadmap for Spark with Kafka on Scala 2.11?

2015-06-04 Thread Tathagata Das
But compile scope is supposed to be added to the assembly.
https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope



On Thu, Jun 4, 2015 at 1:24 PM, algermissen1971 algermissen1...@icloud.com
wrote:

 Hi Iulian,

 On 26 May 2015, at 13:04, Iulian Dragoș iulian.dra...@typesafe.com
 wrote:

 
  On Tue, May 26, 2015 at 10:09 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Hi,
 
  I am setting up a project that requires Kafka support and I wonder what
 the roadmap is for Scala 2.11 Support (including Kafka).
 
  Can we expect to see 2.11 support anytime soon?
 
  The upcoming 1.4 release (now at RC2) includes support for Kafka and
 Scala 2.11.6. It'd be great if you could give it a try. You can find the
 binaries (and staging repository including 2.11 artifacts) here:
 
   https://www.mail-archive.com/dev@spark.apache.org/msg09347.html
 

 Feedback after a coupl eof days:

 - I am using 1.4.0-rc4 now without problems
 - Not used Kafka support yet
 - I am using this with akka-2.3.11 and akka-http 1.0-RC3 (and
 sbt-assembly) and this has produced a dependency nightmare. I am even
 adding guava manually to the assembly because I just could not get
 sbt-assembly to not complain.

 I am far from a good understanding of sbt / maven internals, but it seems
 that the ‘compile’ scope set in the spark POM for a lot of dependencies is
 somehow not honored and the libs end up causing conflicts in sbt-assembly.

 (I am writing this to share experience, not to complain. Thanks for the
 great work!!)

 onward...

 Jan





  iulian
 
 
  Jan
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
  --
 
  --
  Iulian Dragos
 
  --
  Reactive Apps on the JVM
  www.typesafe.com
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Tathagata Das
Could you try it out with Spark 1.4 RC3?

Also pinging, Cloudera folks, they may be aware of something.

BTW, the way I have debugged memory leaks in the past is as follows.

Run with a small driver memory, say 1 GB. Periodically (maybe a script),
take snapshots of histogram and also do memory dumps. Say every hour. And
then compare the difference between two histo/dumps that are few hours
separated (more the better). Diffing histo is easy. Diff two dumps can be
done in JVisualVM, it will show the diff in the objects that got added in
the later dump. That makes it easy to debug what is not getting cleaned.

TD


On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team decides
 to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote:

 While you are running is it possible for you login into the YARN node and
 get histograms of live objects using jmap -histo:live. That may reveal
 something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(topic - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Hi Zhang,

 Could you paste your

Re: Spark updateStateByKey fails with class leak when using case classes - resend

2015-06-01 Thread Tathagata Das
Interesting, only in local[*]! In the github you pointed to, what is the
main that you were running.

TD


On Mon, May 25, 2015 at 9:23 AM, rsearle eggsea...@verizon.net wrote:

 Further experimentation indicates these problems only occur when master is
 local[*].

 There are no issues if a standalone cluster is used.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-updateStateByKey-fails-with-class-leak-when-using-case-classes-resend-tp22793p23020.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to monitor Spark Streaming from Kafka?

2015-06-01 Thread Tathagata Das
In the receiver-less direct approach, there is no concept of consumer
group as we dont use the Kafka High Level consumer (that uses ZK). Instead
Spark Streaming manages offsets on its own, giving tighter guarantees. If
you want to monitor the progress of the processing of offsets, you will
have to update ZK yourself. With the code snippet you posted, you can get
the range of offsets that were processed in each batch, and accordingly
update Zookeeper using some consumer group name of your choice.

TD

On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 Hi,

 What are some of the good/adopted approached to monitoring Spark Streaming
 from Kafka?  I see that there are things like
 http://quantifind.github.io/KafkaOffsetMonitor, for example.  Do they all
 assume that Receiver-based streaming is used?

 Then Note that one disadvantage of this approach (Receiverless Approach,
 #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based
 Kafka monitoring tools will not show progress. However, you can access the
 offsets processed by this approach in each batch and update Zookeeper
 yourself.

 The code sample, however, seems sparse. What do you need to do here? -
  directKafkaStream.foreachRDD(
  new FunctionJavaPairRDDlt;String, String, Void() {
  @Override
  public Void call(JavaPairRDDString, Integer rdd) throws
 IOException {
  OffsetRange[] offsetRanges =
 ((HasOffsetRanges)rdd).offsetRanges
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  return null;
  }
  }
  );

 and if these are updated, will KafkaOffsetMonitor work?

 Monitoring seems to center around the notion of a consumer group.  But in
 the receiverless approach, code on the Spark consumer side doesn't seem to
 expose a consumer group parameter.  Where does it go?  Can I/should I just
 pass in group.id as part of the kafkaParams HashMap?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Recommended Scala version

2015-05-31 Thread Tathagata Das
Can you file a JIRA with the detailed steps to reproduce the problem?

On Fri, May 29, 2015 at 2:59 AM, Alex Nakos ana...@gmail.com wrote:

 Hi-

 I’ve just built the latest spark RC from source (1.4.0 RC3) and can
 confirm that the spark shell is still NOT working properly on 2.11. No
 classes in the jar I've specified with the —jars argument on the command
 line are available in the REPL.


 Cheers
 Alex

 On Thu, May 28, 2015 at 8:38 AM, Tathagata Das t...@databricks.com
 wrote:

 Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming
 out soon) with Scala 2.11 and report issues.

 TD

 On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers ko...@tresata.com wrote:

 we are still running into issues with spark-shell not working on 2.11,
 but we are running on somewhat older master so maybe that has been resolved
 already.

 On Tue, May 26, 2015 at 11:48 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Most of the 2.11 issues are being resolved in Spark 1.4. For a while,
 the Spark project has published maven artifacts that are compiled with 2.11
 and 2.10, although the downloads at
 http://spark.apache.org/downloads.html are still all for 2.10.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh 
 riteshoneinamill...@gmail.com wrote:

 Yes, recommended version is 2.10 as all the features are not supported
 by 2.11 version. Kafka libraries and JDBC components are yet to be ported
 to 2.11 version. And so if your project doesn't depend on these 
 components,
 you can give v2.11 a try.

 Here's a link
 https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211
  for
 building with 2.11 version.

 Though, you won't be running into any issues if you try v2.10 as of
 now. But then again, the future releases will have to shift to 2.11 
 version
 once support for v2.10 ends in the long run.


 On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal 
 punya.bis...@gmail.com wrote:

 Dear Spark developers and users,

 Am I correct in believing that the recommended version of Scala to
 use with Spark is currently 2.10? Is there any plan to switch to 2.11 in
 future? Are there any advantages to using 2.11 today?

 Regards,
 Punya









Re: Recommended Scala version

2015-05-28 Thread Tathagata Das
Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming out
soon) with Scala 2.11 and report issues.

TD

On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers ko...@tresata.com wrote:

 we are still running into issues with spark-shell not working on 2.11, but
 we are running on somewhat older master so maybe that has been resolved
 already.

 On Tue, May 26, 2015 at 11:48 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the
 Spark project has published maven artifacts that are compiled with 2.11 and
 2.10, although the downloads at http://spark.apache.org/downloads.html
 are still all for 2.10.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh 
 riteshoneinamill...@gmail.com wrote:

 Yes, recommended version is 2.10 as all the features are not supported
 by 2.11 version. Kafka libraries and JDBC components are yet to be ported
 to 2.11 version. And so if your project doesn't depend on these components,
 you can give v2.11 a try.

 Here's a link
 https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211
  for
 building with 2.11 version.

 Though, you won't be running into any issues if you try v2.10 as of now.
 But then again, the future releases will have to shift to 2.11 version once
 support for v2.10 ends in the long run.


 On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal 
 punya.bis...@gmail.com wrote:

 Dear Spark developers and users,

 Am I correct in believing that the recommended version of Scala to use
 with Spark is currently 2.10? Is there any plan to switch to 2.11 in
 future? Are there any advantages to using 2.11 today?

 Regards,
 Punya







Re: Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread Tathagata Das
You can throttle the no receiver direct Kafka stream using
spark.streaming.kafka.maxRatePerPartition
http://spark.apache.org/docs/latest/configuration.html#spark-streaming


On Wed, May 27, 2015 at 4:34 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you seen
 http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application
 ?

 Cheers

 On Wed, May 27, 2015 at 4:11 PM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 Hi,

 With the no receivers approach to streaming from Kafka, is there a way to
 set something like spark.streaming.receiver.maxRate so as not to overwhelm
 the Spark consumers?

 What would be some of the ways to throttle the streamed messages so that
 the
 consumers don't run out of memory?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming - Design considerations/Knobs

2015-05-24 Thread Tathagata Das
Blocks are replicated immediately, before the driver launches any jobs
using them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Honestly, given the length of my email, I didn't expect a reply. :-)
 Thanks for reading and replying. However, I have a follow-up question:

 I don't think if I understand the block replication completely. Are the
 blocks replicated immediately after they are received by the receiver? Or
 are they kept on the receiver node only and are moved only on shuffle? Has
 the replication something to do with locality.wait?

 Thanks,
 Hemant

 On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com
 wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are 
 booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates
blocks of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval 
 where N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the 
 Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a
job. At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one
 Spark job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count()
 } }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch

Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Tathagata Das
Can you show us the rest of the program? When are you starting, or stopping
the context. Is the exception occuring right after start or stop? What
about log4j logs, what does it say?

On Fri, May 22, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote:

 I just verified that the following code works on 1.3.0 :

 val stream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topic1)

 val stream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topic2)

 stream1.print()

 stream2.print()


 So something else is probably going on in your case.  See if simply
 printing the two streams works for you, then compare whats different in
 your actual job.

 On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 Hi,

 I'm trying to connect to two topics of Kafka with Spark with DirectStream
 but I get an error. I don't know if there're any limitation to do it,
 because when I just access to one topics everything if right.

 *val ssc = new StreamingContext(sparkConf, Seconds(5))*
 *val kafkaParams = Map[String, String](metadata.broker.list -
 quickstart.cloudera:9092)*
 *val setTopic1 = Set(topic1)*
 *val setTopic2 = Set(topic2)*

 *val stream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
 *val stream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


 The error that I get is:
 * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
 *15/05/22 13:12:40 ERROR OneForOneStrategy: *
 *java.lang.NullPointerException*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
 * at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
 * at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
 * at
 scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


 Are there any limitation to do it?





Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Tathagata Das
Something does not make sense. Receivers (currently) does not get blocked
(unless rate limit has been set) due to processing load. The receiver will
continue to receive data and store it in memory and until it is processed.
So I am still not sure how the data loss is happening. Unless you are
sending data at a faster rate than the receiver can handle (that more than
the max rate the receiver can save data in memory and replicate to other
nodes).

In general, if you are particular about data loss, then UDP is not really a
good choice in the first place. If you can try using TCP, try it. It would
at least eliminate the possibility that I mentioned above. Ultimately if
you try sending data faster that the receiver can handle (independent of
whether processing can handle), then you will loose data if you are using
UDP. You have to use TCP to naturally control the sending rate to match the
receiving rate in the receiver, without dropping data.


On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 This is just a friendly ping, just to remind you of my query.

 Also, is there a possible explanation/example on the usage of
 AsyncRDDActions in Java ?

 On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 I am received data at UDP port 8060 and doing processing on it using
 Spark and storing the output in Neo4j.

 But the data I'm receiving and the data that is getting stored doesn't
 match probably because Neo4j API takes too long to push the data into
 database. Meanwhile, Spark is unable to receive data probably because the
 process is blocked.

 On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Can you elaborate on how the data loss is occurring?


 On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 That is completely alright, as the system will make sure the works get
 done.

 My major concern is, the data drop. Will using async stop data loss?

 On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 If you cannot push data as fast as you are generating it, then async
 isnt going to help either. The work is just going to keep piling up as
 many many async jobs even though your batch processing times will be low 
 as
 that processing time is not going to reflect how much of overall work is
 pending in the system.

 On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Hi,

 From my understanding of Spark Streaming, I created a spark entry
 point, for continuous UDP data, using:

 SparkConf conf = new 
 SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext
  jssc = new JavaStreamingContext(conf, new 
 Duration(1));JavaReceiverInputDStreamString lines = 
 jssc.receiverStream(new CustomReceiver(8060));

 Now, when I process this input stream using:

 JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= 
 hash.mapToPair(my-code)JavaPairDStream output= 
 tuple.reduceByKey(my-code)
 output.foreachRDD(
 new 
 Function2JavaPairRDDString,ArrayListString,Time,Void(){
 @Override
 public Void call(
 JavaPairRDDString, ArrayListString arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub
 new AsyncRDDActions(arg0.rdd(), null);
 arg0.foreachPartition(
 new 
 VoidFunctionIteratorTuple2String,ArrayListString(){

 @Override
 public void call(
 IteratorTuple2String, 
 ArrayListString arg0)
 throws Exception {

 // TODO Auto-generated method 
 stub
 GraphDatabaseService graphDb = 
 new 
 GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/)
 
 .setConfig(remote_shell_enabled, true)
 .newGraphDatabase();

 try (Transaction tx = 
 graphDb.beginTx()) {
 while (arg0.hasNext()) {
 Tuple2  String, 
 ArrayList  String  tuple = arg0.next();
 Node 
 HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
 boolean oldHMac=false;
 if (HMac!= null){
 
 System.out.println(Alread in Database: + tuple._1);
 oldHMac=true

Re: [Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll

2015-05-21 Thread Tathagata Das
Thanks for the JIRA. I will look into this issue.

TD

On Thu, May 21, 2015 at 1:31 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 I ran into one of the issues that are potentially caused because of this
 and have logged a JIRA bug -
 https://issues.apache.org/jira/browse/SPARK-7788

 Thanks,
 Aniket

 On Wed, Sep 24, 2014 at 12:59 PM Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Hi all

 Reading through Spark streaming's custom receiver documentation, it is
 recommended that onStart and onStop methods should not block indefinitely.
 However, looking at the source code of KinesisReceiver, the onStart method
 calls worker.run that blocks until worker is shutdown (via a call to
 onStop).

 So, my question is what are the ramifications of making a blocking call
 in onStart and whether this is something that should be addressed
 in KinesisReceiver implementation.

 Thanks,
 Aniket




Re: Storing spark processed output to Database asynchronously.

2015-05-21 Thread Tathagata Das
If you cannot push data as fast as you are generating it, then async isnt
going to help either. The work is just going to keep piling up as many
many async jobs even though your batch processing times will be low as that
processing time is not going to reflect how much of overall work is pending
in the system.

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote:

 Hi,

 From my understanding of Spark Streaming, I created a spark entry point,
 for continuous UDP data, using:

 SparkConf conf = new 
 SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext
  jssc = new JavaStreamingContext(conf, new 
 Duration(1));JavaReceiverInputDStreamString lines = 
 jssc.receiverStream(new CustomReceiver(8060));

 Now, when I process this input stream using:

 JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= 
 hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code)
 output.foreachRDD(
 new 
 Function2JavaPairRDDString,ArrayListString,Time,Void(){
 @Override
 public Void call(
 JavaPairRDDString, ArrayListString arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub
 new AsyncRDDActions(arg0.rdd(), null);
 arg0.foreachPartition(
 new 
 VoidFunctionIteratorTuple2String,ArrayListString(){

 @Override
 public void call(
 IteratorTuple2String, 
 ArrayListString arg0)
 throws Exception {

 // TODO Auto-generated method stub
 GraphDatabaseService graphDb = new 
 GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/)
 
 .setConfig(remote_shell_enabled, true)
 .newGraphDatabase();

 try (Transaction tx = 
 graphDb.beginTx()) {
 while (arg0.hasNext()) {
 Tuple2  String, ArrayList  
 String  tuple = arg0.next();
 Node 
 HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
 boolean oldHMac=false;
 if (HMac!= null){
 
 System.out.println(Alread in Database: + tuple._1);
 oldHMac=true;
 }
 else
 
 HMac=Neo4jOperations.createHMac(graphDb, tuple._1);

 ArrayListString 
 zipcodes=tuple._2;
 for(String zipcode : 
 zipcodes){
 Node 
 Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
 if(Zipcode!=null){
 
 System.out.println(Already in Database: + zipcode);
 if(oldHMac==true  
 Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
 
 Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
 else
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 }
 else{
 
 Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode);
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 }
 }
 }
 tx.success();
 }
 graphDb.shutdown();
 }
 });
 return null;
 }
 });

 The part of code in output.foreachRDD pushes the output of spark into
 Neo4j Database. Checking for duplicates values.

 This part of code is very time consuming because of which my processing
 time exceeds batch time. Because of that, it 

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Tathagata Das
Looks like somehow the file size reported by the FSInputDStream of
Tachyon's FileSystem interface, is returning zero.

On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon as
 OFF_HEAP block store. As I said in earlier email, I could able to solve the
 BlockNotFound exception when I used Hierarchical Storage of Tachyon ,
  which is good.

 I continue doing some testing around storing the Spark Streaming WAL and
 CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon receivedData location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I raised a
 JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch
 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
 receiver for stream 2 from 10.252.5.62*:47255
 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not
 read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)*
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.IllegalArgumentException:* Seek position is past
 EOF: 645603894, fileSize = 0*
 at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
 at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
 at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 15 more

 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
 stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
 INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage
 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException
 (Could not read data from write ahead log record
 FileBasedWriteAheadLogSegment(tachyon-ft://
 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
 [duplicate 1]
 INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
 stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
 INFO : org.apache.spark.deploy.client.AppClient$ClientActor - 

Re: Connecting to an inmemory database from Spark

2015-05-21 Thread Tathagata Das
Doesnt seem like a Cassandra specific issue. Could you give us more
information (code, errors, stack traces)?


On Thu, May 21, 2015 at 1:33 PM, tshah77 tejasrs...@gmail.com wrote:

 TD,

 Do you have any example about reading from cassandra using spark streaming
 in java?

 I am trying to connect to cassandra using spark streaming and it is
 throwing
 an error as could not parse master url.

 Thanks
 Tejas



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-tp1343p22979.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Tathagata Das
Correcting the ones that are incorrect or incomplete. BUT this is good list
for things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
received the block, and another where the block was replicated) that has
the blocks irrespective of block interval, unless non-local scheduling
kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs is
not impacted.

 To further clarify, the jobs depend on the number of output operations
(print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
output operations.

dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
job per batch

dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
}// TWO Spark jobs per batch

dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata. Only
data checkpointing, needed by only some operations, increase batch
processing time. Read -
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Furthemore, with checkpoint you can recover computation, but you may loose
some data (that was received but not processed before driver failed) for
some sources. Enabling write ahead logs and reliable source + receiver,
allow zero data loss. Read - WAL in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics


- The frequency of metadata checkpoint cleaning can be controlled
using spark.cleaner.ttl. But, data checkpoint cleaning happens
automatically when the RDDs in the checkpoint are no more required.


 Incorrect. metadata checkpointing or 

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-20 Thread Tathagata Das
If you are talking about handling driver crash failures, then all bets are
off anyways! Adding a shutdown hook in the hope of handling driver process
failure, handles only a some cases (Ctrl-C), but does not handle cases like
SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its
not a good idea to rely on that.

Nonetheless I have opened a PR to handle the shutdown of the
StreamigntContext in the same way as SparkContext.
https://github.com/apache/spark/pull/6307


On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Thenka Sean . you are right. If driver program is running then I can
 handle shutdown in main exit path  . But if Driver machine is crashed (if
 you just stop the application, for example killing the driver process ),
 then Shutdownhook is the only option isn't it ? What I try to say is , just
 doing ssc.stop in  sys.ShutdownHookThread  or
  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
 to use the Utils.addShutdownHook with a priority .. So just checking if
 Spark Streaming can make graceful shutdown as default shutdown mechanism.

 Dibyendu

 On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote:

 I don't think you should rely on a shutdown hook. Ideally you try to
 stop it in the main exit path of your program, even in case of an
 exception.

 On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  You mean to say within Runtime.getRuntime().addShutdownHook I call
  ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
 
  This won't work anymore in 1.4.
 
  The SparkContext got stopped before Receiver processed all received
 blocks
  and I see below exception in logs. But if I add the
 Utils.addShutdownHook
  with the priority as I mentioned , then only graceful shutdown works .
 In
  that case shutdown-hook run in priority order.
 





Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed

2015-05-20 Thread Tathagata Das
Has this been fixed for you now? There has been a number of patches since
then and it may have been fixed.

On Thu, May 14, 2015 at 7:20 AM, Wangfei (X) wangf...@huawei.com wrote:

  Yes it is repeatedly on my locally Jenkins.

 发自我的 iPhone

 在 2015年5月14日,18:30,Tathagata Das t...@databricks.com 写道:

   Do you get this failure repeatedly?



 On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.com wrote:

 Hi, all, i got following error when i run unit test of spark by
 dev/run-tests
 on the latest branch-1.4 branch.

 the latest commit id:
 commit d518c0369fa412567855980c3f0f426cde5c190d
 Author: zsxwing zsxw...@gmail.com
 Date:   Wed May 13 17:58:29 2015 -0700

 error

 [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started
 [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed:
 org.apache.spark.SparkException: Error communicating with MapOutputTracker
 [error] at
 org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
 [error] at
 org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119)
 [error] at
 org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
 [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
 [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577)
 [error] at

 org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626)
 [error] at

 org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597)
 [error] at

 org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403)
 [error] at

 org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102)
 [error] at

 org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344)
 [error] at

 org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
 [error] at

 org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74)
 [error] at

 org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
 [error] at
 org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala)
 [error] at
 org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103)
 [error] ...
 [error] Caused by: org.apache.spark.SparkException: Error sending message
 [message = StopMapOutputTracker]
 [error] at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
 [error] at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
 [error] at
 org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109)
 [error] ... 52 more
 [error] Caused by: java.util.concurrent.TimeoutException: Futures timed
 out
 after [120 seconds]
 [error] at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 [error] at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 [error] at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 [error] at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 [error] at scala.concurrent.Await$.result(package.scala:107)
 [error] at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
 [error] ... 54 more



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Tathagata Das
If you wanted to stop it gracefully, then why are you not calling
ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt
matter whether the shutdown hook was called or not.

TD

On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Just figured out that if I want to perform graceful shutdown of Spark
 Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
 longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
 Spark Core, that gets anyway called , which leads to graceful shutdown from
 Spark streaming failed with error like Sparkcontext already closed issue.

 To solve this , I need to explicitly add Utils.addShutdownHook in my
 driver with higher priority ( say 150 ) than Spark's shutdown priority of
 50 , and there I specified streamingcontext stop method with (false , true)
 parameter.

 Just curious to know , if this is how we need to handle shutdown hook
 going forward ?

 Can't we make the streaming shutdown default to gracefully  shutdown ?

 Also the Java Api for adding shutdownhook in Utils looks very dirty with
 methods like this ..



 Utils.addShutdownHook(150, new Function0BoxedUnit() {
  @Override
 public BoxedUnit apply() {
 return null;
 }

 @Override
 public byte apply$mcB$sp() {
 return 0;
 }

 @Override
 public char apply$mcC$sp() {
 return 0;
 }

 @Override
 public double apply$mcD$sp() {
 return 0;
 }

 @Override
 public float apply$mcF$sp() {
 return 0;
 }

 @Override
 public int apply$mcI$sp() {
 // TODO Auto-generated method stub
 return 0;
 }

 @Override
 public long apply$mcJ$sp() {
 return 0;
 }

 @Override
 public short apply$mcS$sp() {
 return 0;
 }

 @Override
 public void apply$mcV$sp() {
  *jsc.stop(false, true);*
  }

 @Override
 public boolean apply$mcZ$sp() {
 // TODO Auto-generated method stub
 return false;
 }
 });



Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-19 Thread Tathagata Das
If you dont want the fileStream to start only after certain event has
happened, why not start the streamingContext after that event?

TD

On Sun, May 17, 2015 at 7:51 PM, Haopu Wang hw...@qilinsoft.com wrote:

  I want to use file stream as input. And I look at SparkStreaming
 document again, it's saying file stream doesn't need a receiver at all.

 So I'm wondering if I can control a specific DStream instance.


  --

 *From:* Evo Eftimov [mailto:evo.efti...@isecc.com]
 *Sent:* Monday, May 18, 2015 12:39 AM
 *To:* 'Akhil Das'; Haopu Wang
 *Cc:* 'user'
 *Subject:* RE: [SparkStreaming] Is it possible to delay the start of some
 DStream in the application?



 You can make ANY *standard* receiver sleep by implementing a custom
 Message Deserializer class with sleep method inside it.



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Sunday, May 17, 2015 4:29 PM
 *To:* Haopu Wang
 *Cc:* user
 *Subject:* Re: [SparkStreaming] Is it possible to delay the start of some
 DStream in the application?



 Why not just trigger your batch job with that event?



 If you really need streaming, then you can create a custom receiver and
 make the receiver sleep till the event has happened. That will obviously
 run your streaming pipelines without having any data to process.


   Thanks

 Best Regards



 On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote:

 In my application, I want to start a DStream computation only after an
 special event has happened (for example, I want to start the receiver
 only after the reference data has been properly initialized).

 My question is: it looks like the DStream will be started right after
 the StreaminContext has been started. Is it possible to delay the start
 of specific DStream?

 Thank you very much!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-16 Thread Tathagata Das
For the Spark Streaming app, if you want a particular action inside a
foreachRDD to go to a particular pool, then make sure you set the pool
within the foreachRDD function. E.g.

dstream.foreachRDD { rdd =
rdd.sparkContext.setLocalProperty(spark.scheduler.pool, pool1) //
set the pool
rdd.count() // or whatever job
}

This will ensure that the jobs will be allocated to the desired pool. LMK
if this works.

TD

On Fri, May 15, 2015 at 11:26 AM, Richard Marscher rmarsc...@localytics.com
 wrote:

 It's not a Spark Streaming app, so sorry I'm not sure of the answer to
 that. I would assume it should work.

 On Fri, May 15, 2015 at 2:22 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 Ok thanks a lot for clarifying that – btw was your application a Spark
 Streaming App – I am also looking for confirmation that FAIR scheduling is
 supported for Spark Streaming Apps



 *From:* Richard Marscher [mailto:rmarsc...@localytics.com]
 *Sent:* Friday, May 15, 2015 7:20 PM
 *To:* Evo Eftimov
 *Cc:* Tathagata Das; user
 *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond



 The doc is a bit confusing IMO, but at least for my application I had to
 use a fair pool configuration to get my stages to be scheduled with FAIR.



 On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 No pools for the moment – for each of the apps using the straightforward
 way with the spark conf param for scheduling = FAIR



 Spark is running in a Standalone Mode



 Are you saying that Configuring Pools is mandatory to get the FAIR
 scheduling working – from the docs it seemed optional to me



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Friday, May 15, 2015 6:45 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond



 How are you configuring the fair scheduler pools?



 On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 I have run / submitted a few Spark Streaming apps configured with Fair
 scheduling on Spark Streaming 1.2.0, however they still run in a FIFO
 mode.
 Is FAIR scheduling supported at all for Spark Streaming apps and from what
 release / version - e.g. 1.3.1




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org









Re: Multiple Kinesis Streams in a single Streaming job

2015-05-14 Thread Tathagata Das
What is the error you are seeing?

TD

On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com
wrote:

 Hi,

 Is it possible to setup streams from multiple Kinesis streams and process
 them in a single job?  From what I have read, this should be possible,
 however, the Kinesis layer errors out whenever I try to receive from more
 than a single Kinesis Stream.

 Here is the code.  Currently, I am focused on just getting receivers setup
 and working for the two Kinesis Streams, as such, this code just attempts
 to
 print out the contents of both streams:

 implicit val formats = Serialization.formats(NoTypeHints)

 val conf = new SparkConf().setMaster(local[*]).setAppName(test)
 val ssc = new StreamingContext(conf, Seconds(1))

 val rawStream = KinesisUtils.createStream(ssc, erich-test,
 kinesis.us-east-1.amazonaws.com, Duration(1000),
 InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
 rawStream.map(msg = new String(msg)).print

 val loaderStream = KinesisUtils.createStream(
   ssc,
   dev-loader,
   kinesis.us-east-1.amazonaws.com,
   Duration(1000),
   InitialPositionInStream.TRIM_HORIZON,
   StorageLevel.MEMORY_ONLY)

 val loader = loaderStream.map(msg = new String(msg)).print

 ssc.start()

 Thanks,
 -Erich



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Multiple Kinesis Streams in a single Streaming job

2015-05-14 Thread Tathagata Das
A possible problem may be that the kinesis stream in 1.3 uses the
SparkContext app name, as the Kinesis Application Name, that is used by the
Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis
DStreams are using the Kinesis application name (as they are in the same
StreamingContext / SparkContext / Spark app name), KCL may be doing weird
overwriting checkpoint information of both Kinesis streams into the same
DynamoDB table. Either ways, this is going to be fixed in Spark 1.4.

On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote:

 have you tried to union the 2 streams per the KinesisWordCountASL example
 https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120
  where
 2 streams (against the same Kinesis stream in this case) are created and
 union'd?

 it should work the same way - including union() of streams from totally
 different source types (kafka, kinesis, flume).



 On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com
 wrote:

 What is the error you are seeing?

 TD

 On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com
 wrote:

 Hi,

 Is it possible to setup streams from multiple Kinesis streams and process
 them in a single job?  From what I have read, this should be possible,
 however, the Kinesis layer errors out whenever I try to receive from more
 than a single Kinesis Stream.

 Here is the code.  Currently, I am focused on just getting receivers
 setup
 and working for the two Kinesis Streams, as such, this code just
 attempts to
 print out the contents of both streams:

 implicit val formats = Serialization.formats(NoTypeHints)

 val conf = new SparkConf().setMaster(local[*]).setAppName(test)
 val ssc = new StreamingContext(conf, Seconds(1))

 val rawStream = KinesisUtils.createStream(ssc, erich-test,
 kinesis.us-east-1.amazonaws.com, Duration(1000),
 InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
 rawStream.map(msg = new String(msg)).print

 val loaderStream = KinesisUtils.createStream(
   ssc,
   dev-loader,
   kinesis.us-east-1.amazonaws.com,
   Duration(1000),
   InitialPositionInStream.TRIM_HORIZON,
   StorageLevel.MEMORY_ONLY)

 val loader = loaderStream.map(msg = new String(msg)).print

 ssc.start()

 Thanks,
 -Erich



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-14 Thread Tathagata Das
It would be good if you can tell what I should add to the documentation to
make it easier to understand. I can update the docs for 1.4.0 release.

On Tue, May 12, 2015 at 9:52 AM, Lee McFadden splee...@gmail.com wrote:

 Thanks for explaining Sean and Cody, this makes sense now.  I'd like to
 help improve this documentation so other python users don't run into the
 same thing, so I'll look into that today.

 On Tue, May 12, 2015 at 9:44 AM Cody Koeninger c...@koeninger.org wrote:

 One of the packages just contains the streaming-kafka code.  The other
 contains that code, plus everything it depends on.  That's what assembly
 typically means in JVM land.

 Java/Scala users are accustomed to using their own build tool to include
 necessary dependencies.  JVM dependency management is (thankfully)
 different from Python dependency management.

 As far as I can tell, there is no core issue, upstream or otherwise.






 On Tue, May 12, 2015 at 11:39 AM, Lee McFadden splee...@gmail.com
 wrote:

 Thanks again for all the help folks.

 I can confirm that simply switching to `--packages
 org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes
 everything work as intended.

 I'm not sure what the difference is between the two packages honestly,
 or why one should be used over the other, but the documentation is
 currently not intuitive in this matter.  If you follow the instructions,
 initially it will seem broken.  Is there any reason why the docs for Python
 users (or, in fact, all users - Java/Scala users will run into this too
 except they are armed with the ability to build their own jar with the
 dependencies included) should not be changed to using the assembly package
 by default?

 Additionally, after a few google searches yesterday combined with your
 help I'm wondering if the core issue is upstream in Kafka's dependency
 chain?

 On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote:

 bq. it is already in the assembly

 Yes. Verified:

 $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep 
 yammer | grep Gauge
   1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class


 On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote:

 It doesn't depend directly on yammer metrics; Kafka does. It wouldn't
 be correct to declare that it does; it is already in the assembly
 anyway.

 On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote:
  Currently external/kafka/pom.xml doesn't cite yammer metrics as
 dependency.
 
  $ ls -l
 
 ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar
  -rw-r--r--  1 tyu  staff  82123 Dec 17  2013
 
 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar
 
  Including the metrics-core jar would not increase the size of the
 final
  release artifact much.
 
  My two cents.






Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed

2015-05-14 Thread Tathagata Das
Do you get this failure repeatedly?



On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.com wrote:

 Hi, all, i got following error when i run unit test of spark by
 dev/run-tests
 on the latest branch-1.4 branch.

 the latest commit id:
 commit d518c0369fa412567855980c3f0f426cde5c190d
 Author: zsxwing zsxw...@gmail.com
 Date:   Wed May 13 17:58:29 2015 -0700

 error

 [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started
 [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed:
 org.apache.spark.SparkException: Error communicating with MapOutputTracker
 [error] at
 org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
 [error] at
 org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119)
 [error] at
 org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
 [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
 [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577)
 [error] at

 org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626)
 [error] at

 org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597)
 [error] at

 org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403)
 [error] at

 org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102)
 [error] at

 org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344)
 [error] at

 org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
 [error] at

 org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74)
 [error] at

 org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102)
 [error] at
 org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala)
 [error] at
 org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103)
 [error] ...
 [error] Caused by: org.apache.spark.SparkException: Error sending message
 [message = StopMapOutputTracker]
 [error] at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
 [error] at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
 [error] at
 org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109)
 [error] ... 52 more
 [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [120 seconds]
 [error] at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 [error] at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 [error] at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 [error] at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 [error] at scala.concurrent.Await$.result(package.scala:107)
 [error] at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
 [error] ... 54 more



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: how to use rdd.countApprox

2015-05-13 Thread Tathagata Das
That is not supposed to happen :/ That is probably a bug.
If you have the log4j logs, would be good to file a JIRA. This may be worth
debugging.

On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote:

 Actually I tried that before asking. However, it killed the spark context.
 :-)

 Du



   On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com
 wrote:


 That is a good question. I dont see a direct way to do that.

 You could do try the following

 val jobGroupId = group-id-based-on-current-time
 rdd.sparkContext.setJobGroup(jobGroupId)
 val approxCount = rdd.countApprox().getInitialValue   // job launched with
 the set job group
 rdd.sparkContext.cancelJobGroup(jobGroupId)   // cancel the job


 On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote:

 Hi TD,

 Do you know how to cancel the rdd.countApprox(5000) tasks after the
 timeout? Otherwise it keeps running until completion, producing results not
 used but consuming resources.

 Thanks,
 Du



   On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


  Hi TD,

 Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming
 app is standing a much better chance to complete processing each batch
 within the batch interval.

 Du


   On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com
 wrote:


 From the code it seems that as soon as the  rdd.countApprox(5000)
 returns, you can call pResult.initialValue() to get the approximate count
 at that point of time (that is after timeout). Calling
 pResult.getFinalValue() will further block until the job is over, and
 give the final correct values that you would have received by rdd.count()

 On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 HI,

 I tested the following in my streaming app and hoped to get an approximate
 count within 5 seconds. However, rdd.countApprox(5000).getFinalValue()
 seemed to always return after it finishes completely, just like
 rdd.count(), which often exceeded 5 seconds. The values for low, mean, and
 high were the same.

 val pResult = rdd.countApprox(5000)
 val bDouble = pResult.getFinalValue()
 logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong},
 mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong})

 Can any expert here help explain the right way of usage?

 Thanks,
 Du







   On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


 I have to count RDD's in a spark streaming app. When data goes large,
 count() becomes expensive. Did anybody have experience using countApprox()?
 How accurate/reliable is it?

 The documentation is pretty modest. Suppose the timeout parameter is in
 milliseconds. Can I retrieve the count value by calling getFinalValue()?
 Does it block and return only after the timeout? Or do I need to define
 onComplete/onFail handlers to extract count value from the partial results?

 Thanks,
 Du













Re: how to use rdd.countApprox

2015-05-13 Thread Tathagata Das
You might get stage information through SparkListener. But I am not sure
whether you can use that information to easily kill stages.
Though i highly recommend using Spark 1.3.1 (or even Spark master). Things
move really fast between releases. 1.1.1 feels really old to me :P

TD

On Wed, May 13, 2015 at 1:25 PM, Du Li l...@yahoo-inc.com wrote:

 I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside
 dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context
 seems no longer valid, which crashes subsequent jobs.

 My spark version is 1.1.1. I will do more investigation into this issue,
 perhaps after upgrading to 1.3.1, and then file a JIRA if it persists.

 Is there a way to get stage or task id of a particular transformation or
 action on RDD and then selectively kill the stage or tasks? It would be
 necessary and useful in situations similar to countApprox.

 Thanks,
 Du



   On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com
 wrote:


 That is not supposed to happen :/ That is probably a bug.
 If you have the log4j logs, would be good to file a JIRA. This may be
 worth debugging.

 On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote:

 Actually I tried that before asking. However, it killed the spark context.
 :-)

 Du



   On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com
 wrote:


 That is a good question. I dont see a direct way to do that.

 You could do try the following

 val jobGroupId = group-id-based-on-current-time
 rdd.sparkContext.setJobGroup(jobGroupId)
 val approxCount = rdd.countApprox().getInitialValue   // job launched with
 the set job group
 rdd.sparkContext.cancelJobGroup(jobGroupId)   // cancel the job


 On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote:

 Hi TD,

 Do you know how to cancel the rdd.countApprox(5000) tasks after the
 timeout? Otherwise it keeps running until completion, producing results not
 used but consuming resources.

 Thanks,
 Du



   On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


  Hi TD,

 Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming
 app is standing a much better chance to complete processing each batch
 within the batch interval.

 Du


   On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com
 wrote:


 From the code it seems that as soon as the  rdd.countApprox(5000)
 returns, you can call pResult.initialValue() to get the approximate count
 at that point of time (that is after timeout). Calling
 pResult.getFinalValue() will further block until the job is over, and
 give the final correct values that you would have received by rdd.count()

 On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 HI,

 I tested the following in my streaming app and hoped to get an approximate
 count within 5 seconds. However, rdd.countApprox(5000).getFinalValue()
 seemed to always return after it finishes completely, just like
 rdd.count(), which often exceeded 5 seconds. The values for low, mean, and
 high were the same.

 val pResult = rdd.countApprox(5000)
 val bDouble = pResult.getFinalValue()
 logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong},
 mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong})

 Can any expert here help explain the right way of usage?

 Thanks,
 Du







   On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


 I have to count RDD's in a spark streaming app. When data goes large,
 count() becomes expensive. Did anybody have experience using countApprox()?
 How accurate/reliable is it?

 The documentation is pretty modest. Suppose the timeout parameter is in
 milliseconds. Can I retrieve the count value by calling getFinalValue()?
 Does it block and return only after the timeout? Or do I need to define
 onComplete/onFail handlers to extract count value from the partial results?

 Thanks,
 Du
















Re: DStream Union vs. StreamingContext Union

2015-05-12 Thread Tathagata Das
@Vadim What happened when you tried unioning using DStream.union in python?

TD

On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 I can confirm it does work in Java



 *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com]
 *Sent:* Tuesday, May 12, 2015 5:53 PM
 *To:* Evo Eftimov
 *Cc:* Saisai Shao; user@spark.apache.org

 *Subject:* Re: DStream Union vs. StreamingContext Union



 Thanks Evo. I tried chaining Dstream unions like what you have and it
 didn't work for me. But passing

 multiple arguments to StreamingContext.union worked fine. Any idea why? I
 am using Python, BTW.

 ᐧ



 On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also union multiple DstreamRDDs in this way
 DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3)  etc etc



 Ps: the API is not “redundant” it offers several ways for achivieving the
 same thing as a convenience depending on the situation



 *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com]
 *Sent:* Tuesday, May 12, 2015 5:37 PM
 *To:* Saisai Shao
 *Cc:* user@spark.apache.org
 *Subject:* Re: DStream Union vs. StreamingContext Union



 Thanks Saisai. That makes sense. Just seems redundant to have both.

 ᐧ



 On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 DStream.union can only union two DStream, one is itself. While
 StreamingContext.union can union an array of DStreams, internally
 DStream.union is a special case of StreamingContext.union:



 def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this,
 that))



 So there's no difference, if you want to union more than two DStreams,
 just use the one in StreamingContext, otherwise, both two APIs are fine.





 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:

 Can someone explain to me the difference between DStream union and
 StreamingContext union?

 When do you use one vs the other?



 Thanks,

 Vadim

 ᐧ









Re: DStream Union vs. StreamingContext Union

2015-05-12 Thread Tathagata Das
I wonder that may be a bug in the Python API. Please file it as a JIRA
along with sample code to reproduce it and sample output you get.

On Tue, May 12, 2015 at 10:00 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 @TD I kept getting an empty RDD (i.e. rdd.take(1) was False).
 ᐧ

 On Tue, May 12, 2015 at 12:57 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 @Vadim What happened when you tried unioning using DStream.union in
 python?

 TD

 On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 I can confirm it does work in Java



 *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com]
 *Sent:* Tuesday, May 12, 2015 5:53 PM
 *To:* Evo Eftimov
 *Cc:* Saisai Shao; user@spark.apache.org

 *Subject:* Re: DStream Union vs. StreamingContext Union



 Thanks Evo. I tried chaining Dstream unions like what you have and it
 didn't work for me. But passing

 multiple arguments to StreamingContext.union worked fine. Any idea why?
 I am using Python, BTW.

 ᐧ



 On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also union multiple DstreamRDDs in this way
 DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3)  etc etc



 Ps: the API is not “redundant” it offers several ways for achivieving
 the same thing as a convenience depending on the situation



 *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com]
 *Sent:* Tuesday, May 12, 2015 5:37 PM
 *To:* Saisai Shao
 *Cc:* user@spark.apache.org
 *Subject:* Re: DStream Union vs. StreamingContext Union



 Thanks Saisai. That makes sense. Just seems redundant to have both.

 ᐧ



 On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 DStream.union can only union two DStream, one is itself. While
 StreamingContext.union can union an array of DStreams, internally
 DStream.union is a special case of StreamingContext.union:



 def union(that: DStream[T]): DStream[T] = new
 UnionDStream[T](Array(this, that))



 So there's no difference, if you want to union more than two DStreams,
 just use the one in StreamingContext, otherwise, both two APIs are fine.





 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:

 Can someone explain to me the difference between DStream union and
 StreamingContext union?

 When do you use one vs the other?



 Thanks,

 Vadim

 ᐧ












Re: how to use rdd.countApprox

2015-05-12 Thread Tathagata Das
From the code it seems that as soon as the  rdd.countApprox(5000)
returns, you can call pResult.initialValue() to get the approximate count
at that point of time (that is after timeout). Calling
pResult.getFinalValue() will further block until the job is over, and
give the final correct values that you would have received by rdd.count()

On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 HI,

 I tested the following in my streaming app and hoped to get an approximate
 count within 5 seconds. However, rdd.countApprox(5000).getFinalValue()
 seemed to always return after it finishes completely, just like
 rdd.count(), which often exceeded 5 seconds. The values for low, mean, and
 high were the same.

 val pResult = rdd.countApprox(5000)
 val bDouble = pResult.getFinalValue()
 logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong},
 mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong})

 Can any expert here help explain the right way of usage?

 Thanks,
 Du







   On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID
 wrote:


 I have to count RDD's in a spark streaming app. When data goes large,
 count() becomes expensive. Did anybody have experience using countApprox()?
 How accurate/reliable is it?

 The documentation is pretty modest. Suppose the timeout parameter is in
 milliseconds. Can I retrieve the count value by calling getFinalValue()?
 Does it block and return only after the timeout? Or do I need to define
 onComplete/onFail handlers to extract count value from the partial results?

 Thanks,
 Du





Re: Receiver Fault Tolerance

2015-05-06 Thread Tathagata Das
Incorrect. The receiver runs in an executor just like a any other tasks. In
the cluster mode, the driver runs in a worker, however it launches
executors in OTHER workers in the cluster. Its those executors running in
other workers that run tasks, and also the receivers.

On Wed, May 6, 2015 at 5:09 AM, James King jakwebin...@gmail.com wrote:

 In the O'reilly book Learning Spark Chapter 10 section 24/7 Operation

 It talks about 'Receiver Fault Tolerance'

 I'm unsure of what a Receiver is here, from reading it sounds like when
 you submit an application to the cluster in cluster mode i.e. *--deploy-mode
 cluster *the driver program will run on a Worker and this case this
 Worker is seen as a Receiver because it is consuming messages from the
 source.


 Is the above understanding correct? or is there more to it?





Re: How update counter in cassandra

2015-05-06 Thread Tathagata Das
This may help.

http://www.slideshare.net/helenaedelson/lambda-architecture-with-spark-spark-streaming-kafka-cassandra-akka-and-scala


On Wed, May 6, 2015 at 5:35 PM, Sergio Jiménez Barrio drarse.a...@gmail.com
 wrote:

 I have a Counter family colums in Cassandra. I want update this counters
 with a aplication in spark Streaming. How can I update counter cassandra
 with Spark?

 Thanks.



Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Is the function ingestToMysql running on the driver or on the executors?
Accordingly you can try debugging while running in a distributed manner,
with and without calling the function.

If you dont get too many open files without calling ingestToMysql(), the
problem is likely to be in ingestToMysql().
If you get the problem even without calling ingestToMysql(), then the
problem may be in Kafka. If the problem is occuring in the driver, then its
the DirecKafkaInputDStream code. If the problem is occurring in the
executor, then the problem is in KafkaRDD.

TD

On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so many
 files. Do you think there is possible socket leakage? BTW, in every batch
 which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 

Re: Join between Streaming data vs Historical Data in spark

2015-04-29 Thread Tathagata Das
Have you taken a look at the join section in the streaming programming
guide?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#stream-dataset-joins

On Wed, Apr 29, 2015 at 7:11 AM, Rendy Bambang Junior 
rendy.b.jun...@gmail.com wrote:

 Let say I have transaction data and visit data

 visit
 | userId | Visit source | Timestamp |
 | A  | google ads   | 1 |
 | A  | facebook ads | 2 |

 transaction
 | userId | total price | timestamp |
 | A  | 100 | 248384|
 | B  | 200 | 43298739  |

 I want to join transaction data and visit data to do sales attribution. I
 want to do it realtime whenever transaction occurs (streaming).

 Is it scalable to do join between one data and very big historical data
 using join function in spark? If it is not, then how it usually be done?

 Visit needs to be historical, since visit can be anytime before
 transaction (e.g. visit is one year before transaction occurs)

 Rendy



Re: implicit function in SparkStreaming

2015-04-29 Thread Tathagata Das
I believe that the implicit def is pulling in the enclosing class (in which
the def is defined) in the closure which is not serializable.


On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Hi guys,
 I`m puzzled why i cant use the implicit function in spark streaming to
 cause Task not serializable .

 code snippet:

 implicit final def str2KeyValue(s:String): (String,String) = {
   val message = s.split(\\|)
   if(message.length = 2)
 (message(0),message(1))
   else if(message.length == 1) {
 (message(0), )
   }
   else
 (,)
 }

 def filter(stream:DStream[String]) :DStream[String] = {
   stream.filter(s = {
 (s._1==Action  s._2==TRUE)
   })


 Could you please give me some pointers ? Thank you .

 --
 guoqing0...@yahoo.com.hk



Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Tathagata Das
Also cc;ing Cody.

@Cody maybe there is a reason for doing connection pooling even if there is
not performance difference.

TD

On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote:

 Is the function ingestToMysql running on the driver or on the executors?
 Accordingly you can try debugging while running in a distributed manner,
 with and without calling the function.

 If you dont get too many open files without calling ingestToMysql(), the
 problem is likely to be in ingestToMysql().
 If you get the problem even without calling ingestToMysql(), then the
 problem may be in Kafka. If the problem is occuring in the driver, then its
 the DirecKafkaInputDStream code. If the problem is occurring in the
 executor, then the problem is in KafkaRDD.

 TD

 On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add statement.close() in finally block ?

 Streaming / Kafka experts may have better insight.

 On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Thanks for the suggestion. I ran the command and the limit is 1024.

 Based on my understanding, the connector to Kafka should not open so
 many files. Do you think there is possible socket leakage? BTW, in every
 batch which is 5 seconds, I output some results to mysql:

   def ingestToMysql(data: Array[String]) {
 val url =
 jdbc:mysql://localhost:3306/realtime?user=rootpassword=123
 var sql = insert into loggingserver1 values 
 data.foreach(line = sql += line)
 sql = sql.dropRight(1)
 sql += ;
 logger.info(sql)
 var conn: java.sql.Connection = null
 try {
   conn = DriverManager.getConnection(url)
   val statement = conn.createStatement()
   statement.executeUpdate(sql)
 } catch {
   case e: Exception = logger.error(e.getMessage())
 } finally {
   if (conn != null) {
 conn.close
   }
 }
   }

 I am not sure whether the leakage originates from Kafka connector or the
 sql connections.

 Bill

 On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you run the command 'ulimit -n' to see the current limit ?

 To configure ulimit settings on Ubuntu, edit
 */etc/security/limits.conf*
 Cheers

 On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am using the direct approach to receive real-time data from Kafka in
 the following link:

 https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html


 My code follows the word count direct example:


 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala



 After around 12 hours, I got the following error messages in Spark log:

 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
 143033869 ms
 org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
 many open files, java.io.IOException: Too many open files,
 java.io.IOException: Too many open files, java.io.IOException: Too many
 open files, java.io.IOException: Too many open files)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
 at
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
 at
 scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
 at
 org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300

Re: Driver memory leak?

2015-04-29 Thread Tathagata Das
It could be related to this.
https://issues.apache.org/jira/browse/SPARK-6737

This was fixed in Spark 1.3.1.



On Wed, Apr 29, 2015 at 8:38 AM, Sean Owen so...@cloudera.com wrote:

 Not sure what you mean. It's already in CDH since 5.4 = 1.3.0
 (This isn't the place to ask about CDH)
 I also don't think that's the problem. The process did not run out of
 memory.

 On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com
 wrote:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.
 @Sean
 Will it be backported to CDH? I did't find that bug in CDH 5.4 release
 notes.

 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.

 It also was a HashMap causing the issue.

 -Conor



 On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't
 say you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from
 kafka, the program already run about 120 hours, but today the program
 failed because of driver's OOM as follow:

 Container
 [pid=49133,containerID=container_1429773909253_0050_02_01] is running
 beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical
 memory used; 3.2 GB of 50 GB virtual memory used. Killing container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it 
 using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result
 as follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about
 600Mb memory!

 and I also using jmap to monitor other program(already run about 1
 hours),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the
 java.util.HashMap$Entry and java.lang.Long object will occupied more
 and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards















Re: Re: implicit function in SparkStreaming

2015-04-29 Thread Tathagata Das
Could you put the implicit def in an object? That should work, as objects
are never serialized.


On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk 
guoqing0...@yahoo.com.hk wrote:

 Thank you for your pointers , it`s very helpful to me , in this scenario
 how can i use the implicit def in the enclosing class ?



 *From:* Tathagata Das t...@databricks.com
 *Date:* 2015-04-30 07:00
 *To:* guoqing0...@yahoo.com.hk
 *CC:* user user@spark.apache.org
 *Subject:* Re: implicit function in SparkStreaming
 I believe that the implicit def is pulling in the enclosing class (in
 which the def is defined) in the closure which is not serializable.


 On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk 
 guoqing0...@yahoo.com.hk wrote:

 Hi guys,
 I`m puzzled why i cant use the implicit function in spark streaming to
 cause Task not serializable .

 code snippet:

 implicit final def str2KeyValue(s:String): (String,String) = {
   val message = s.split(\\|)
   if(message.length = 2)
 (message(0),message(1))
   else if(message.length == 1) {
 (message(0), )
   }
   else
 (,)
 }

 def filter(stream:DStream[String]) :DStream[String] = {
   stream.filter(s = {
 (s._1==Action  s._2==TRUE)
   })


 Could you please give me some pointers ? Thank you .

 --
 guoqing0...@yahoo.com.hk





Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-23 Thread Tathagata Das
What was the state of your streaming application? Was it falling behind
with a large increasing scheduling delay?

TD

On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote:

 Thanks for the response, Conor. I tried with those settings and for a
 while it seemed like it was cleaning up shuffle files after itself.
 However, after exactly 5 hours later it started throwing exceptions and
 eventually stopped working again. A sample stack trace is below. What is
 curious about 5 hours is that I set the cleaner ttl to 5 hours after
 changing the max window size to 1 hour (down from 6 hours in order to
 test). It also stopped cleaning the shuffle files after this started
 happening.

 Any idea why this could be happening?

 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989
 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147)
 java.lang.Exception: Could not compute split, block input-0-1429706099000
 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Thanks
 NB


 On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell 
 conor.fenn...@altocloud.com wrote:

 Hi,


 We set the spark.cleaner.ttl to some reasonable time and also
 set spark.streaming.unpersist=true.


 Those together cleaned up the shuffle files for us.


 -Conor

 On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote:

 We already do have a cron job in place to clean just the shuffle files.
 However, what I would really like to know is whether there is a proper
 way of telling spark to clean up these files once its done with them?

 Thanks
 NB


 On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune
 -exec rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and
 not being cleaned up by Spark. Since this is a Spark streaming 
 application,
 it is expected to stay up indefinitely, so shuffle files not being cleaned
 up is a big problem right now. Our max window size is 6 hours, so we have
 set up a cron job to clean up shuffle files older than 12 hours otherwise
 it will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files
 is being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation 
 bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and
 meanwhile how to handle shuffle files will be greatly appreciated.

 Thanks
 NB











Re: Convert DStream to DataFrame

2015-04-22 Thread Tathagata Das
Did you checkout the latest streaming programming guide?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

You also need to be aware of that to convert json RDDs to dataframe,
sqlContext has to make a pass on the data to learn the schema. This will
fail if a batch has no data. You have to safeguard against that.

On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com wrote:

 What about sqlcontext.createDataframe(rdd)?
 On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com
 wrote:

 Hi,

 I am using Kafka with Apache Stream to send JSON to Apache Spark:

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topicsSet)

 Now, I want parse the DStream created to DataFrame, but I don't know if
 Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the
 message with:

 val lines = messages.map(_._2)

 Thank u for all. Sergio J.





Re: Map Question

2015-04-22 Thread Tathagata Das
Is the mylist present on every executor? If not, then you have to pass it
on. And broadcasts are the best way to pass them on. But note that once
broadcasted it will immutable at the executors, and if you update the list
at the driver, you will have to broadcast it again.

TD

On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map, i.e.,
 myrdd.map(myfunc), myfunc is in a separate Python module. In yet another
 separate Python module I have a global list, i.e. mylist, that's populated
 with metadata. I can't get myfunc to see mylist...it's always empty.
 Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim
 ᐧ



Re: Not able run multiple tasks in parallel, spark streaming

2015-04-22 Thread Tathagata Das
Furthermore, just to explain, doing arr.par.foreach does not help because
it not really running anything, it only doing setup of the computation.
Doing the setup in parallel does not mean that the jobs will be done
concurrently.

Also, from your code it seems like your pairs of dstreams dont interact
with each other (that is pair1 dont interact with pair2). Then you could
run then in separate applications, which would allow them to run in
parallel.


On Tue, Apr 21, 2015 at 11:53 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can enable this flag to run multiple jobs concurrently, It might not
 be production ready, but you can give it a try:

 sc.set(spark.streaming.concurrentJobs,2)

 ​Refer to TD's answer here
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header
 for more information.​


 Thanks
 Best Regards

 On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal abhaybansal.1...@gmail.com
 wrote:

 Hi,

 I have use case wherein I have to join multiple kafka topics in parallel.
 So if there are 2n topics there is a one to one mapping of topics which
 needs to be joined.


 val arr= ...

 for(condition) {

 val dStream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics1
 ).map(a=(getKey1(a._2),a._2))

 val dStream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics2
 ).map(a=(getKey2(a._2),a._2))

arr(counter) = (dStream1, dStream2);

counter+=1;

 }



 arr.par.foreach {

 case(dStream1, dStream2) = try {

 val joined = dStream1.join(dStream2,4);

 joined.saveAsTextFiles(joinedData”)

 }

 catch {

 case t:Exception =t.printStackTrace();

 }

 }



 ssc.start()

 ssc.awaitTermination()


 Doing so the streams are getting joined by sequentially. Is there a way
 out of this? I am new to spark, would appreciate any suggestions around
 this.


 Thanks,

 -Abhay





Re: Map Question

2015-04-22 Thread Tathagata Das
Absolutely. The same code would work for local as well as distributed mode!

On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast variable
 were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to pass
 it on. And broadcasts are the best way to pass them on. But note that once
 broadcasted it will immutable at the executors, and if you update the list
 at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim








Re: Map Question

2015-04-22 Thread Tathagata Das
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were
introduced right at the very beginning of Spark.



On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to production
 on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to pass it
 on. And broadcasts are the best way to pass them on. But note that once
 broadcasted it will immutable at the executors, and if you update the list
 at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim






Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-22 Thread Tathagata Das
It could very well be that your executor memory is not enough to store the
state RDDs AND operate on the data. 1G per executor is quite low.
Definitely give more memory. And have you tried increasing the number of
partitions (specify number of partitions in updateStateByKey) ?

On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Anyone?

 On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
 sourav.chan...@livestream.com wrote:

  Hi Olivier,
 
  *the update function is as below*:
 
  *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
  Long)]) = {*
  *  val previousCount = state.getOrElse((0L, 0L))._2*
  *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
  *  var currentCount = 0L*
  *  val lastIndexOfConcurrentUsers =*
  *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
  *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
  *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
  _).count + previousCount*
  *  val lastConcurrentViewersCount =
  values(lastIndexOfConcurrentUsers).count*
 
  *  if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
  = 1) {*
  *logger.error(*
  *  sCount using state updation $currentCountFromSubList,  +*
  *sConcurrentUsers count $lastConcurrentViewersCount +*
  *s resetting to $lastConcurrentViewersCount*
  *)*
  *currentCount = lastConcurrentViewersCount*
  *  }*
  *  val remainingValuesList = values.diff(subList)*
  *  startValue = ConcurrentViewers(currentCount)*
  *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
  _).count*
 
  *  if (currentCount  0) {*
 
  *logger.error(*
  *  sERROR: Got new count $currentCount  0, value:$values,
  state:$state, resetting to 0*
  *)*
  *currentCount = 0*
  *  }*
  *  // to stop pushing subsequent 0 after receiving first 0*
  *  if (currentCount == 0  previousCount == 0) None*
  *  else Some(previousCount, currentCount)*
  *}*
 
  *trait IConcurrentUsers {*
  *  val count: Long*
  *  def op(a: IConcurrentUsers): IConcurrentUsers =
  IConcurrentUsers.op(this, a)*
  *}*
 
  *object IConcurrentUsers {*
  *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
  (a, b) match {*
  *case (_, _: ConcurrentViewers) = *
  *  ConcurrentViewers(b.count)*
  *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = *
  *  ConcurrentViewers(a.count + b.count)*
  *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = *
  *  ConcurrentViewers(a.count - b.count)*
  *  }*
  *}*
 
  *case class IncrementConcurrentViewers(count: Long) extends
  IConcurrentUsers*
  *case class DecrementConcurrentViewers(count: Long) extends
  IConcurrentUsers*
  *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
 
 
  *also the error stack trace copied from executor logs is:*
 
  *java.lang.OutOfMemoryError: Java heap space*
  *at
 
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
  *at
  org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
  *at
  org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
  *at
  org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
  *at
 
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
  *at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
  *at
 
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
  *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
  *at
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
  *at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
  *at java.lang.reflect.Method.invoke(Method.java:601)*
  *at
  java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
  *at
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
  *at
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
  *at
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
  *at
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
  *at
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
  *at
 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
  *at
 
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
  *at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
  *at
 
 

Re: Map Question

2015-04-22 Thread Tathagata Das
Can you give full code? especially the myfunc?

On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 Here's what I did:

 print 'BROADCASTING...'
 broadcastVar = sc.broadcast(mylist)
 print broadcastVar
 print broadcastVar.value
 print 'FINISHED BROADCASTING...'

 The above works fine,

 but when I call myrdd.map(myfunc) I get *NameError: global name
 'broadcastVar' is not defined*

 The myfunc function is in a different module. How do I make it aware of
 broadcastVar?
 ᐧ

 On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Great. Will try to modify the code. Always room to optimize!
 ᐧ

 On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com
 wrote:

 Absolutely. The same code would work for local as well as distributed
 mode!

 On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Can I use broadcast vars in local mode?
 ᐧ

 On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com
 wrote:

 Yep. Not efficient. Pretty bad actually. That's why broadcast variable
 were introduced right at the very beginning of Spark.



 On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks TD. I was looking into broadcast variables.

 Right now I am running it locally...and I plan to move it to
 production on EC2.

 The way I fixed it is by doing myrdd.map(lambda x: (x,
 mylist)).map(myfunc) but I don't think it's efficient?

 mylist is filled only once at the start and never changes.

 Vadim
 ᐧ

 On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com
 wrote:

 Is the mylist present on every executor? If not, then you have to
 pass it on. And broadcasts are the best way to pass them on. But note 
 that
 once broadcasted it will immutable at the executors, and if you update 
 the
 list at the driver, you will have to broadcast it again.

 TD

 On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I am using Spark Streaming with Python. For each RDD, I call a map,
 i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet
 another separate Python module I have a global list, i.e. mylist,
 that's populated with metadata. I can't get myfunc to see mylist...it's
 always empty. Alternatively, I guess I could pass mylist to map.

 Any suggestions?

 Thanks,
 Vadim











Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-22 Thread Tathagata Das
Vaguely makes sense. :) Wow that's an interesting corner case.

On Wed, Apr 22, 2015 at 1:57 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 I have now a fair understanding of the situation after looking at javap
 output. So as a reminder:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key:
 K) : Option[V] = None
 }

 Basically the serialization failed because the ClassTag[K] came from the
 enclosing class, in which the dstream.map() code is running e.g. :

 class A[K : ClassTag](val dstream: DStream[K]) {
   [...]
   def fun =
 dstream.map(tuple = {
   val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
   (tuple._1, (tuple._2, w)) })
}

 therefore the instance of class A is being serialized and it fails when
 the dstream field call writeObject() when it checks for the graph field...

 The fact that graph is not set might be expected given that I have not
 started the context yet...

 Cheers,


 On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 It is kind of unexpected, i can imagine a real scenario under which it
 should trigger. But obviously I am missing something :)

 TD

 On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Sure. But in general, I am assuming this Graph is unexpectedly null
 when DStream is being serialized must mean something. Under which
 circumstances, such an exception would trigger?

 On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, I am not sure what is going on. The only way to figure to take a
 look at the disassembled bytecodes using javap.

 TD

 On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 At this point I am assuming that nobody has an idea... I am still
 going to give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
  wrote:

 Hey, so I start the context at the very end when all the piping is
 done. BTW a foreachRDD will be called on the resulting dstream.map() 
 right
 after that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud 
 j...@tellapart.com wrote:

 Hi,

 I am getting this serialization exception and I am not too sure
 what Graph is unexpectedly null when DStream is being serialized 
 means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task
 not serializable
 at org.apache.spark.util.ClosureCleaner$.
 ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.
 clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.
 tryOrIOException(Utils.scala:985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey:
 String, key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g.
 removing ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,











Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-21 Thread Tathagata Das
It is kind of unexpected, i can imagine a real scenario under which it
should trigger. But obviously I am missing something :)

TD

On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 Sure. But in general, I am assuming this Graph is unexpectedly null
 when DStream is being serialized must mean something. Under which
 circumstances, such an exception would trigger?

 On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, I am not sure what is going on. The only way to figure to take a
 look at the disassembled bytecodes using javap.

 TD

 On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 At this point I am assuming that nobody has an idea... I am still going
 to give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey, so I start the context at the very end when all the piping is
 done. BTW a foreachRDD will be called on the resulting dstream.map() right
 after that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud 
 j...@tellapart.com wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task
 not serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.
 clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
 985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,









Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-21 Thread Tathagata Das
Yeah, I am not sure what is going on. The only way to figure to take a look
at the disassembled bytecodes using javap.

TD

On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 At this point I am assuming that nobody has an idea... I am still going to
 give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey, so I start the context at the very end when all the piping is done.
 BTW a foreachRDD will be called on the resulting dstream.map() right after
 that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task not
 serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
 985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,







Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers

2015-04-20 Thread Tathagata Das
Responses inline.

On Mon, Apr 20, 2015 at 3:27 PM, Ankit Patel patel7...@hotmail.com wrote:

 What you said is correct and I am expecting the printlns to be in my
 console or my SparkUI. I do not see it in either places.

Can you actually login into the machine running the executor which runs the
receiver? And then see the executors logs in that machine, to see whether
the expected onStart logs are there?


 However, if you run the program then the printlns do print for the
 constructor of the receiver and the for the foreach statements with total
 count 0.

That's expected. In all cases, with or without master, the receiver object
is constructed in the driver. With master, the receiver gets serialized and
sent to worker machine and rest of the prints go to the executor logs.


 When you run it in regular more with no master attached then you will see
 the counts being printed out in the console as well. Please compile my
 program and try it out, I have spent significant time on debugging where it
 can go wrong and could not find an answer.


As I said, checkout the executor logs in the worker machine running the
receiver. You can identify that machine from the streaming tab in the Spark
UI.


 I also see the starting receiver logs from spark when no master is
 defined, but do not see it when there is. Also, I am running some other
 simple code with spark-submit with printlns and I do see them in my
 SparkUI, but not for spark streaming.

That's expected. Same reason. Without master, received logs is in the same
driver process. With master, they go to executor logs.

As such, its not clear what you are trying to debug? If you just want to
see printlns, then this seems to be the expected behavior, nothing is
wrong. Use without master for convenient debugging (you can see all logs
and prints), and then run into distributed mode for actual deployment
(where its harder to see those logs and prints).


 Thanks,
 Ankit

 --
 From: t...@databricks.com
 Date: Mon, 20 Apr 2015 13:29:31 -0700
 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver
 attached to master with multiple workers
 To: patel7...@hotmail.com
 CC: ak...@sigmoidanalytics.com; user@spark.apache.org


 Well, the receiver always runs as part of an executor. When running
 locally (that is, spark-submit without --master), the executor is in the
 same process as the driver, so you see the printlns. If you are running
 with --master spark://cluster, then the executors ar running in different
 process and possibly different nodes. Hence you dont see the printlns in
 the output of driver process. If you see the the output of executorsin the
 Spark UI, then you may find those prints.

 TD

 On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel patel7...@hotmail.com
 wrote:

 The code I've written is simple as it just invokes a thread and calls a
 store method on the Receiver class.

  I see this code with printlns working fine when I try spark-submit --jars
 jar --class test.TestCustomReceiver jar

 However it does not work with I try the same command above with --master
 spark://masterURL
 spark-submit --master spark://masterURL --jars jar --class
 test.TestCustomReceiver jar

 I also tried setting the master in the conf that I am created, but that
 does not work either. I do not see the onStart println being printed when I
 use --master option. Please advice.

 Also, the master I am attaching to has multiple workers across hosts with
 many threads available to it.

 The code is pasted below (Classes: TestReviever, TestCustomReceiver):



 package test;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;

 public class TestReceiver extends ReceiverString {

  public TestReceiver() {
super(StorageLevel.MEMORY_ONLY());
System.out.println(Ankit: Created TestReceiver);
  }

  @Override
  public void onStart() {
 System.out.println(Start TestReceiver);
 new TestThread().start();
  }
  public void onStop() {}  @SuppressWarnings(unused)

private class TestThread extends Thread{
   @Override
   public void run() {
while(true){
   try{
  sleep( (long) (Math.random() *
 3000));
   }catch(Exception e){
  e.printStackTrace();
   }
   store(Time:  +
 System.currentTimeMillis());
}
  }
  }

}



 package test;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import 

Re: RAM management during cogroup and join

2015-04-15 Thread Tathagata Das
Significant optimizations can be made by doing the joining/cogroup in a
smart way. If you have to join streaming RDDs with the same batch RDD, then
you can first partition the batch RDDs using a partitions and cache it, and
then use the same partitioner on the streaming RDDs. That would make sure
that the large batch RDDs is not partitioned repeatedly for the cogroup,
only the small streaming RDDs are partitioned.

HTH

TD

On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 There are indications that joins in Spark are implemented with / based on
 the
 cogroup function/primitive/transform. So let me focus first on cogroup - it
 returns a result which is RDD consisting of essentially ALL elements of the
 cogrouped RDDs. Said in another way - for every key in each of the
 cogrouped
 RDDs there is at least one element from at least one of the cogrouped RDDs.

 That would mean that when smaller, moreover streaming e.g.
 JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
 would result in RAM allocated for multiple instances of the result
 (cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
 Obviously the RAM will get returned when the DStream RDDs get discard and
 they do on a regular basis, but still that seems as unnecessary spike in
 the
 RAM consumption

 I have two questions:

 1.Is there anyway to control the cogroup process more precisely e.g. tell
 it to include I the cogrouped RDD only elements where there are at least
 one
 element from EACH of the cogrouped RDDs per given key. Based on the current
 cogroup API this is not possible


 2.If the cogroup is really such a sledgehammer and secondly the joins are
 based on cogroup then even though they can present a prettier picture in
 terms of the end result visible to the end user does that mean that under
 the hood there is still the same atrocious RAM consumption going on




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RAM management during cogroup and join

2015-04-15 Thread Tathagata Das
Agreed.

On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 That has been done Sir and represents further optimizations – the
 objective here was to confirm whether cogroup always results in the
 previously described “greedy” explosion of the number of elements included
 and RAM allocated for the result RDD



 The optimizations mentioned still don’t change the total number of
 elements included in the result RDD and RAM allocated – right?



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, April 15, 2015 9:25 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: RAM management during cogroup and join



 Significant optimizations can be made by doing the joining/cogroup in a
 smart way. If you have to join streaming RDDs with the same batch RDD, then
 you can first partition the batch RDDs using a partitions and cache it, and
 then use the same partitioner on the streaming RDDs. That would make sure
 that the large batch RDDs is not partitioned repeatedly for the cogroup,
 only the small streaming RDDs are partitioned.



 HTH



 TD



 On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 There are indications that joins in Spark are implemented with / based on
 the
 cogroup function/primitive/transform. So let me focus first on cogroup - it
 returns a result which is RDD consisting of essentially ALL elements of the
 cogrouped RDDs. Said in another way - for every key in each of the
 cogrouped
 RDDs there is at least one element from at least one of the cogrouped RDDs.

 That would mean that when smaller, moreover streaming e.g.
 JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
 would result in RAM allocated for multiple instances of the result
 (cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
 Obviously the RAM will get returned when the DStream RDDs get discard and
 they do on a regular basis, but still that seems as unnecessary spike in
 the
 RAM consumption

 I have two questions:

 1.Is there anyway to control the cogroup process more precisely e.g. tell
 it to include I the cogrouped RDD only elements where there are at least
 one
 element from EACH of the cogrouped RDDs per given key. Based on the current
 cogroup API this is not possible


 2.If the cogroup is really such a sledgehammer and secondly the joins are
 based on cogroup then even though they can present a prettier picture in
 terms of the end result visible to the end user does that mean that under
 the hood there is still the same atrocious RAM consumption going on




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: RAM management during cogroup and join

2015-04-15 Thread Tathagata Das
Well, DStream joins are nothing but RDD joins at its core. However, there
are more optimizations that you using DataFrames and Spark SQL joins. With
the schema, there is a greater scope for optimizing the joins. So
converting RDDs from streaming and the batch RDDs to data frames, and then
applying joins may improve performance.

TD

On Wed, Apr 15, 2015 at 1:50 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Thank you Sir, and one final confirmation/clarification -  are all forms
 of joins in the Spark API for DStream RDDs based on cogroup in terms of
 their internal implementation



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, April 15, 2015 9:48 PM

 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: RAM management during cogroup and join



 Agreed.



 On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 That has been done Sir and represents further optimizations – the
 objective here was to confirm whether cogroup always results in the
 previously described “greedy” explosion of the number of elements included
 and RAM allocated for the result RDD



 The optimizations mentioned still don’t change the total number of
 elements included in the result RDD and RAM allocated – right?



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, April 15, 2015 9:25 PM
 *To:* Evo Eftimov
 *Cc:* user
 *Subject:* Re: RAM management during cogroup and join



 Significant optimizations can be made by doing the joining/cogroup in a
 smart way. If you have to join streaming RDDs with the same batch RDD, then
 you can first partition the batch RDDs using a partitions and cache it, and
 then use the same partitioner on the streaming RDDs. That would make sure
 that the large batch RDDs is not partitioned repeatedly for the cogroup,
 only the small streaming RDDs are partitioned.



 HTH



 TD



 On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 There are indications that joins in Spark are implemented with / based on
 the
 cogroup function/primitive/transform. So let me focus first on cogroup - it
 returns a result which is RDD consisting of essentially ALL elements of the
 cogrouped RDDs. Said in another way - for every key in each of the
 cogrouped
 RDDs there is at least one element from at least one of the cogrouped RDDs.

 That would mean that when smaller, moreover streaming e.g.
 JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that
 would result in RAM allocated for multiple instances of the result
 (cogrouped) RDD a.k.a essentially the large batch RDD and some more ...
 Obviously the RAM will get returned when the DStream RDDs get discard and
 they do on a regular basis, but still that seems as unnecessary spike in
 the
 RAM consumption

 I have two questions:

 1.Is there anyway to control the cogroup process more precisely e.g. tell
 it to include I the cogrouped RDD only elements where there are at least
 one
 element from EACH of the cogrouped RDDs per given key. Based on the current
 cogroup API this is not possible


 2.If the cogroup is really such a sledgehammer and secondly the joins are
 based on cogroup then even though they can present a prettier picture in
 terms of the end result visible to the end user does that mean that under
 the hood there is still the same atrocious RAM consumption going on




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?

2015-04-15 Thread Tathagata Das
Can you clarify more on what you want to do after querying? Is the batch
not completed until the querying and subsequent processing has completed?


On Tue, Apr 14, 2015 at 10:36 PM, Krzysztof Zarzycki k.zarzy...@gmail.com
wrote:

 Thank you Tathagata, very helpful answer.

 Though, I would like to highlight that recent stream processing systems
 are trying to help users in implementing use case of holding such large
 (like 2 months of data) states. I would mention here Samza state
 management
 http://samza.apache.org/learn/documentation/0.9/container/state-management.html
  and
 Trident state management
 https://storm.apache.org/documentation/Trident-state. I'm waiting when
 Spark would help with that too, because generally I definitely prefer this
 technology:)

 But considering holding state in Cassandra with Spark Streaming, I
 understand we're not talking here about using Cassandra as input nor output
 (nor make use of spark-cassandra-connector
 https://github.com/datastax/spark-cassandra-connector). We're talking
 here about querying Cassandra from map/mapPartition functions.
 I have one question about it: Is it possible to query Cassandra
 asynchronously within Spark Streaming? And while doing it, is it possible
 to take next batch of rows, while the previous is waiting on Cassandra I/O?
 I think (but I'm not sure) this generally asks, whether several consecutive
 windows can interleave (because they are long to process)? Let's draw it:

 --|query Cassandra asynchronously---  window1
 --- window2

 While writing it, I start to believe they can, because windows are
 time-triggered, not triggered when previous window has finished... But it's
 better to ask:)




 2015-04-15 2:08 GMT+02:00 Tathagata Das t...@databricks.com:

 Fundamentally, stream processing systems are designed for processing
 streams of data, not for storing large volumes of data for a long period of
 time. So if you have to maintain that much state for months, then its best
 to use another system that is designed for long term storage (like
 Cassandra) which has proper support for making all that state
 fault-tolerant, high-performant, etc. So yes, the best option is to use
 Cassandra for the state and Spark Streaming jobs accessing the state from
 Cassandra. There are a number of optimizations that can be done. Its not
 too hard to build a simple on-demand populated cache (singleton hash map
 for example), that speeds up access from Cassandra, and all updates are
 written through the cache. This is a common use of Spark Streaming +
 Cassandra/HBase.

 Regarding the performance of updateStateByKey, we are aware of the
 limitations, and we will improve it soon :)

 TD


 On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki 
 k.zarzy...@gmail.com wrote:

 Hey guys, could you please help me with a question I asked on
 Stackoverflow:
 https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two
 ?  I'll be really grateful for your help!

 I'm also pasting the question below:

 I'm trying to solve a (simplified here) problem in Spark Streaming:
 Let's say I have a log of events made by users, where each event is a tuple
 (user name, activity, time), e.g.:

 (user1, view, 2015-04-14T21:04Z) (user1, click,
 2015-04-14T21:05Z)

 Now I would like to gather events by user to do some analysis of that.
 Let's say that output is some analysis of:

 (user1, List((view, 2015-04-14T21:04Z),(click,
 2015-04-14T21:05Z))

 The events should be kept for even *2 months*. During that time there
 might be around *500 milion*of such events, and *millions of unique* users,
 which are keys here.

 *My questions are:*

- Is it feasible to do such a thing with updateStateByKey on
DStream, when I have millions of keys stored?
- Am I right that DStream.window is no use here, when I have 2
months length window and would like to have a slide of few seconds?

 P.S. I found out, that updateStateByKey is called on all the keys on
 every slide, so that means it will be called millions of time every few
 seconds. That makes me doubt in this design and I'm rather thinking about
 alternative solutions like:

- using Cassandra for state
- using Trident state (with Cassandra probably)
- using Samza with its state management.






Re: How to do dispatching in Streaming?

2015-04-15 Thread Tathagata Das
It may be worthwhile to do architect the computation in a different way.

dstream.foreachRDD { rdd =
   rdd.foreach { record =
  // do different things for each record based on filters
   }
}

TD

On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I have a Kafka topic that contains dozens of different types of messages.
 And for each one I'll need to create a DStream for it.

 Currently I have to filter the Kafka stream over and over, which is very
 inefficient.

 So what's the best way to do dispatching in Spark Streaming? (one DStream
 - multiple DStreams)


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?

2015-04-14 Thread Tathagata Das
Fundamentally, stream processing systems are designed for processing
streams of data, not for storing large volumes of data for a long period of
time. So if you have to maintain that much state for months, then its best
to use another system that is designed for long term storage (like
Cassandra) which has proper support for making all that state
fault-tolerant, high-performant, etc. So yes, the best option is to use
Cassandra for the state and Spark Streaming jobs accessing the state from
Cassandra. There are a number of optimizations that can be done. Its not
too hard to build a simple on-demand populated cache (singleton hash map
for example), that speeds up access from Cassandra, and all updates are
written through the cache. This is a common use of Spark Streaming +
Cassandra/HBase.

Regarding the performance of updateStateByKey, we are aware of the
limitations, and we will improve it soon :)

TD


On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com
wrote:

 Hey guys, could you please help me with a question I asked on
 Stackoverflow:
 https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two
 ?  I'll be really grateful for your help!

 I'm also pasting the question below:

 I'm trying to solve a (simplified here) problem in Spark Streaming: Let's
 say I have a log of events made by users, where each event is a tuple (user
 name, activity, time), e.g.:

 (user1, view, 2015-04-14T21:04Z) (user1, click,
 2015-04-14T21:05Z)

 Now I would like to gather events by user to do some analysis of that.
 Let's say that output is some analysis of:

 (user1, List((view, 2015-04-14T21:04Z),(click,
 2015-04-14T21:05Z))

 The events should be kept for even *2 months*. During that time there
 might be around *500 milion*of such events, and *millions of unique* users,
 which are keys here.

 *My questions are:*

- Is it feasible to do such a thing with updateStateByKey on DStream,
when I have millions of keys stored?
- Am I right that DStream.window is no use here, when I have 2 months
length window and would like to have a slide of few seconds?

 P.S. I found out, that updateStateByKey is called on all the keys on every
 slide, so that means it will be called millions of time every few seconds.
 That makes me doubt in this design and I'm rather thinking about
 alternative solutions like:

- using Cassandra for state
- using Trident state (with Cassandra probably)
- using Samza with its state management.




Re: Seeing message about receiver not being de-registered on invoking Streaming context stop

2015-04-14 Thread Tathagata Das
What version of Spark are you using? There was a known bug which could be
causing this. It got fixed in Spark 1.3

TD

On Mon, Apr 13, 2015 at 11:44 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 When you say done fetching documents, does it mean that you are stopping
 the streamingContext? (ssc.stop) or you meant completed fetching documents
 for a batch? If possible, you could paste your custom receiver code so that
 we can have a look at it.

 Thanks
 Best Regards

 On Tue, Apr 7, 2015 at 8:46 AM, Hari Polisetty hpoli...@icloud.com
 wrote:

  My application is running Spark in local mode and  I have a Spark
 Streaming Listener as well as a Custom Receiver. When the receiver is done
 fetching all documents, it invokes “stop” on itself.

 I see the StreamingListener  getting a callback on “onReceiverStopped”
 where I stop the streaming context.


 However, I see the following message in my logs:


 2015-04-06 16:41:51,193 WARN [Thread-66]
 com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop
 - Stopped receiver

 2015-04-06 16:41:51,193 ERROR
 [sparkDriver-akka.actor.default-dispatcher-17]
 org.apache.spark.Logging$class.logError - Deregistered receiver for stream
 0: AlHURLEY

 2015-04-06 16:41:51,202 WARN [Executor task launch worker-2]
 org.apache.spark.Logging$class.logWarning - Stopped executor without error

 2015-04-06 16:41:51,203 WARN [StreamingListenerBus]
 org.apache.spark.Logging$class.logWarning - All of the receivers have not
 deregistered, Map(0 -
 ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY))


 What am I missing or doing wrong?





Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Tathagata Das
Have you tried marking only spark-streaming-kinesis-asl as not provided,
and the rest as provided? Then you will not even need to add
kinesis-asl.jar in the spark-submit.

TD

On Tue, Apr 14, 2015 at 2:27 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Richard,

 You response was very helpful and actually resolved my issue. In case
 others run into a similar issue,  I followed the procedure:

- Upgraded to spark 1.3.0
- Add all spark related libraries are provided
- Include spark transitive library dependencies

 where my build.sbt file

 libraryDependencies ++= {
   Seq(
 org.apache.spark %% spark-core % 1.3.0 % provided,
 org.apache.spark %% spark-streaming % 1.3.0 % provided,
 org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 %
 provided,
 joda-time % joda-time % 2.2,
 org.joda % joda-convert % 1.2,
 com.amazonaws % aws-java-sdk % 1.8.3,
 com.amazonaws % amazon-kinesis-client % 1.2.0)

 and submitting a spark job can done via

 sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars
 spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class
 com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

 Thanks again Richard!

 Cheers Mike.


 On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 Hi,

 I've gotten an application working with sbt-assembly and spark, thought
 I'd present an option. In my experience, trying to bundle any of the Spark
 libraries in your uber jar is going to be a major pain. There will be a lot
 of deduplication to work through and even if you resolve them it can be
 easy to do it incorrectly. I considered it an intractable problem. So the
 alternative is to not include those jars in your uber jar. For this to work
 you will need the same libraries on the classpath of your Spark cluster and
 your driver program (if you are running that as an application and not just
 using spark-submit).

 As for your NoClassDefFoundError, you either are missing Joda Time in
 your runtime classpath or have conflicting versions. It looks like
 something related to AWS wants to use it. Check your uber jar to see if its
 including the org/joda/time as well as the classpath of your spark cluster.
 For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib'
 directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one
 point in Spark 1.2 I found a conflict between httpclient versions that my
 uber jar pulled in for AWS libraries and the one bundled in the spark uber
 jar. I hand patched the spark uber jar to remove the offending httpclient
 bytecode to resolve the issue. You may be facing a similar situation.

 I hope that gives some ideas for resolving your issue.

 Regards,
 Rich

 On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi Vadim,

 After removing provided from org.apache.spark %%
 spark-streaming-kinesis-asl I ended up with huge number of deduplicate
 errors:

 https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a

 It would be nice if you could share some pieces of your mergeStrategy
 code for reference.

 Also, after adding provided back to spark-streaming-kinesis-asl and
 I submit the spark job with the spark-streaming-kinesis-asl jar file

 sh /usr/lib/spark/bin/spark-submit --verbose --jars
 lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
 target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

 I still end up with the following error...

 Exception in thread main java.lang.NoClassDefFoundError:
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)

 Has anyone else run into this issue?



 On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I don't believe the Kinesis asl should be provided. I used
 mergeStrategy successfully to produce an uber jar.

 Fyi, I've been having trouble consuming data out of Kinesis with Spark
 with no success :(
 Would be curious to know if you got it working.

 Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi All,

 I have having trouble building a fat jar file through sbt-assembly.

 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
 

Re: not found: value SQLContextSingleton

2015-04-11 Thread Tathagata Das
Have you created a class called  SQLContextSingleton ? If so, is it in the
compile class path?

On Fri, Apr 10, 2015 at 6:47 AM, Mukund Ranjan (muranjan) 
muran...@cisco.com wrote:

  Hi All,


  Any idea why I am getting this error?





  wordsTenSeconds.foreachRDD((rdd: RDD[String], time: Time) = {



   val sqlContext =   SQLContextSingleton.getInstance(rdd.sparkContext)
   // This line is creating this error



 })



  — E R R O R ——



  [error]
 /Users/muranjan/workspace/kafka/src/main/scala/kafka/KafkaConsumer.scala:103:
 not found: value SQLContextSingleton

 [error]   val sqlContext =
 SQLContextSingleton.getInstance(rdd.sparkContext)

 [error]^

 [error]
 /Users/muranjan/workspace/kafka/src/main/scala/kafka/KafkaConsumer.scala:158:
 not found: value SQLContextSingleton

 [error]   val sqlContext =
 SQLContextSingleton.getInstance(rdd.sparkContext)

 [error]^

 [error] two errors found

 [error] (compile:compileIncremental) Compilation failed



  — E R R O R ——

  Thanks,
 Mukund



Re: coalesce(*, false) problem

2015-04-10 Thread Tathagata Das
Coalesce tries to reduce the number of partitions into smaller number of
partitions, without moving the data around (as much as possible). Since
most of received data is in a few machines (those running receivers),
coallesce just makes bigger merged partitions in those.

Without coalesce
Machine 1: 10 partitions processing in parallel
Machine 2: 2 partitions processing in parallel

With coalesce
Machine 1: 10 partitions merged into 1 partition processed together taking
10 times longer
Machine 2: 2 partitions merged into 1 partition process together taking 2
times longer

Hope this clarifies.

TD

On Fri, Apr 10, 2015 at 5:16 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote:

   Hi All,

  We are running a spark streaming application. The data source is
 kafka, the data partition of kafka is not well-distributed
 http://dict.cn/well-distributed  but every receiver on every executor
 can receive data, just different of the amount.

 and our data is very large so we try to a local repartition with
 coalesce(*.false). but we found an odd appearances。



 Most of the task running on one executor.  See picture one. When we remove
 the coalesce call the task can distributed
 http://dict.cn/well-distributed  better see picture two. Any one knows
 why?



 *Picture one*





 *Picture two*


 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain
 information that is privileged and confidential. You are hereby notified
 that, if you are not an intended recipient listed above, or an authorized
 employee or agent of an addressee of this communication responsible for
 delivering e-mail messages to an intended recipient, any dissemination,
 distribution or reproduction of this communication (including any
 attachments hereto) is strictly prohibited. If you have received this
 communication in error, please notify us immediately by a reply e-mail
 addressed to the sender and permanently delete the original e-mail
 communication and any attachments from all storage devices without making
 or otherwise retaining a copy.



Re: Lookup / Access of master data in spark streaming

2015-04-09 Thread Tathagata Das
Responses inline. Hope they help.

On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani aassud...@impetus.com wrote:

  Hi Friends,

  I am trying to solve a use case in spark streaming, I need help on
 getting to right approach on lookup / update the master data.

  Use case ( simplified )
  I’ve a dataset of entity with three attributes and identifier/row key in
 a persistent store.

  Each attribute along with row key come from a different stream let’s
 say, effectively 3 source streams.

  Now whenever any attribute comes up, I want to update/sync the
 persistent store and do some processing, but the processing would require
 the latest state of entity with latest values of three attributes.

  I wish if I have the all the entities cached in some sort of centralized
 cache ( like we have data in hdfs ) within spark streaming which may be
 used for data local processing. But I assume there is no such thing.

  potential approaches I m thinking of, I suspect first two are not
 feasible, but I want to confirm,
1.  Is Broadcast Variables mutable ? If yes, can I use it as cache
 for all entities sizing  around 100s of GBs provided i have a cluster with
 enough RAM.


Broadcast variables are not mutable. But you can always create a new
broadcast variable when you want and use the latest broadcast variable in
your computation.

dstream.transform { rdd =

   val latestBroacast = getLatestBroadcastVariable()  // fetch existing or
update+create new and return
   val transformedRDD = rdd. ..  // use  latestBroacast in RDD
tranformations
   transformedRDD
}

Since the transform RDD-to-RDD function runs on the driver every batch
interval, it will always use the latest broadcast variable that you want.
Though note that whenever you create a new broadcast, the next batch may
take a little longer to as the data needs to be actually broadcasted out.
That can also be made asynchronous by running a simple task (to force the
broadcasting out) on any new broadcast variable in a different thread as
Spark Streaming batch schedule, but using the same underlying Spark Context.




1. Is there any kind of sticky partition possible, so that I route my
stream data to go through the same node where I've the corresponding
entities, subset of entire store, cached in memory within JVM / off heap on
the node, this would avoid lookups from store.

 You could use updateStateByKey. That is quite sticky, but does not
eliminate the possibility that it can run on a different node. In fact this
is necessary for fault-tolerance - what if the node it was supposed to run
goes down? The task will be run on a different node, and you have to
 design your application such that it can handle that.


1. If I stream the entities from persistent store into engine, this
becomes 4th stream - the entity stream, how do i use join / merge to enable
stream 1,2,3 to lookup and update the data from stream 4. Would
DStream.join work for few seconds worth of data in attribute streams with
all data in entity stream ? Or do I use transform and within that use rdd
join, I’ve doubts if I am leaning towards core spark approach in spark
streaming ?


Depends on what kind of join! If you want the join every batch in stream
with a static data set (or rarely updated dataset), the transform+join is
the way to go. If you want to join one stream with a window of data from
another stream, then DStream.join is the way to go.


1.


1. The last approach, which i think will surely work but i want to
avoid, is i keep the entities in IMDB and do lookup/update calls on from
stream 1,2 and 3.


   Any help is deeply appreciated as this would help me design my system
 efficiently and the solution approach may become a beacon for lookup master
 data sort of problems.

  Regards,
  Amit

 --






 NOTE: This message may contain information that is confidential,
 proprietary, privileged or otherwise protected by law. The message is
 intended solely for the named addressee. If received in error, please
 destroy and notify the sender. Any use of this email is prohibited when
 received in error. Impetus does not represent, warrant and/or guarantee,
 that the integrity of this communication has been maintained nor that the
 communication is free of errors, virus, interception or interference.



Re: Could not compute split, block not found in Spark Streaming Simple Application

2015-04-09 Thread Tathagata Das
Are you running # of receivers = # machines?

TD

On Thu, Apr 9, 2015 at 9:56 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Sorry, I was getting those errors because my workload was not sustainable.

 However, I noticed that, by just running the spark-streaming-benchmark (
 https://github.com/tdas/spark-streaming-benchmark/blob/master/Benchmark.scala
 ), I get no difference on the execution time, number of processed records,
 and delay whether I'm using 1 machine or 2 machines with the setup
 described before (using spark standalone). Is it normal?



 On Fri, Mar 27, 2015 at 5:32 PM, Tathagata Das t...@databricks.com
 wrote:

 If it is deterministically reproducible, could you generate full DEBUG
 level logs, from the driver and the workers and give it to me? Basically I
 want to trace through what is happening to the block that is not being
 found.
 And can you tell what Cluster manager are you using? Spark Standalone,
 Mesos or YARN?


 On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 I am just running this simple example with
 machineA: 1 master + 1 worker
 machineB: 1 worker
 «
 val ssc = new StreamingContext(sparkConf, Duration(1000))

 val rawStreams = (1 to numStreams).map(_
 =ssc.rawSocketStream[String](host, port,
 StorageLevel.MEMORY_ONLY_SER)).toArray
 val union = ssc.union(rawStreams)

 union.filter(line = Random.nextInt(1) == 0).map(line = {
   var sum = BigInt(0)
   line.toCharArray.foreach(chr = sum += chr.toInt)
   fib2(sum)
   sum
 }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result:
 $s).print()
 »

 And I'm getting the following exceptions:

 Log from machineB
 «
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task
 132
 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132)
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task
 134
 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134)
 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast
 variable 24
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task
 136
 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136)
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task
 138
 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138)
 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task
 140
 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140)
 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with
 curMem=47117, maxMem=280248975
 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as
 bytes in memory (estimated size 1886.0 B, free 267.2 MB)
 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block
 broadcast_24_piece0
 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24
 took 19 ms
 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with
 curMem=49003, maxMem=280248975
 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values
 in memory (estimated size 3.0 KB, free 267.2 MB)
 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0
 (TID 140)
 java.lang.Exception: Could not compute split, block
 input-0-1427473262420 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:701)
 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0
 (TID 138)
 java.lang.Exception: Could not compute split, block
 input-0-1427473262418 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51

Re: Continuous WARN messages from BlockManager about block replication

2015-04-09 Thread Tathagata Das
Well, you are running in local mode, so it cannot find another peer to
replicate the blocks received from receivers. That's it. Its not a real
concern and that error will go away when you are run it in a cluster.

On Thu, Apr 9, 2015 at 11:24 AM, Nandan Tammineedi nan...@defend7.com
wrote:

 Hi,

 I'm running a spark streaming job in local mode (--master local[4]), and
 I'm seeing tons of these messages, roughly once every second -

 WARN BlockManager: Block input-0-1428527584600 replicated to only 0
 peer(s) instead of 1 peers

 We're using spark 1.2.1. Even with TRACE logging enabled, we're not seeing
 any log messages indicating failure to replicate the blocks.

 Should we be concerned about this warning (and if so, how should we debug
 this), or is this a corner case in local mode where replication is not
 attempted, but the warning is emitted anyway? If so, what is the workaround?

 thanks

 Nandan



Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread Tathagata Das
There are a couple of options. Increase timeout (see Spark configuration).

Also see past mails in the mailing list.

Another option you may try (I have gut feeling that may work, but I am not
sure) is calling GC on the driver periodically. The cleaning up of stuff is
tied to GCing of RDD objects and regular cleaning may help keep things
clean more rigorously rather than in unpredictable bursts of GC activity.

Let us know how it works out.

TD

On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote:

 I have a standalone and local Spark streaming process where we are reading
 inputs using FlumeUtils. Our longest window size is 6 hours. After about a
 day and a half of running without any issues, we start seeing Timeout
 errors while cleaning up input blocks. This seems to cause reading from
 Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 ... 17 more

 There was a similar query posted here
 http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html
 but did not find any resolution to that issue.


 Thanks in advance,
 NB




Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread Tathagata Das
Yes, in local mode they the driver and executor will be same the process.
And in that case the Java options in  SparkConf configuration will not
work.

On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote:

 Since we are running in local mode, won't all the executors be in the same
 JVM as the driver?

 Thanks
 NB

 On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

 TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

 Given that we are running in local mode (all in a single JVM) for now,
 does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before
 we use it to create the StreamingContext? I ask because that is what we are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

 Thanks
 NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

 Also see past mails in the mailing list.

 Another option you may try (I have gut feeling that may work, but I am
 not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

 Let us know how it works out.

 TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause reading
 from Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640

Re: Empty RDD?

2015-04-08 Thread Tathagata Das
Aah yes. The jsonRDD method needs to walk through the whole RDD to
understand the schema, and does not work if there is not data in it. Making
sure there is no data in it using take(1) should work.

TD


Re: Empty RDD?

2015-04-08 Thread Tathagata Das
What is the computation you are doing in the foreachRDD, that is throwing
the exception?
One way to guard against is to do a take(1) to see if you get back any
data. If there is none, then don't do anything with the RDD.

TD

On Wed, Apr 8, 2015 at 1:08 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com
 wrote:

 When I call *transform* or *foreachRDD *on* DStream*, I keep getting an
 error that I have an empty RDD, which make sense since my batch interval
 maybe smaller than the rate at which new data are coming in. How to guard
 against it?

 Thanks,
 Vadim
 ᐧ



Re: Spark + Kinesis

2015-04-06 Thread Tathagata Das
, batchInterval)/* Kinesis checkpoint
 interval.  Same as batchInterval for this example. */val
 kinesisCheckpointInterval = batchInterval/* Create the same number of
 Kinesis DStreams/Receivers as Kinesis stream's shards */val
 kinesisStreams = (0 until numStreams).map { i =
 KinesisUtils.createStream(ssc, streamName, endpointUrl,
 kinesisCheckpointInterval,  InitialPositionInStream.LATEST,
 StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val
 unionStreams  = ssc.union(kinesisStreams).map(byteArray = new
 String(byteArray))unionStreams.print()ssc.start()
 ssc.awaitTermination()  }}*


 On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com
 wrote:

 Just remove provided for spark-streaming-kinesis-asl

 libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0

 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 Thanks. So how do I fix it?

 On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

   spark-streaming-kinesis-asl is not part of the Spark distribution
 on your cluster, so you cannot have it be just a provided dependency.
 This is also why the KCL and its dependencies were not included in the
 assembly (but yes, they should be).


  ~ Jonathan Kelly

   From: Vadim Bichutskiy vadim.bichuts...@gmail.com
 Date: Friday, April 3, 2015 at 12:26 PM
 To: Jonathan Kelly jonat...@amazon.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Spark + Kinesis

   Hi all,

  Good news! I was able to create a Kinesis consumer and assemble it
 into an uber jar following
 http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
 http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.htmlsi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9
 and example
 https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scalasi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9
 .

  However when I try to spark-submit it I get the following exception:

  *Exception in thread main java.lang.NoClassDefFoundError:
 com/amazonaws/auth/AWSCredentialsProvider*

  Do I need to include KCL dependency in *build.sbt*, here's what it
 looks like currently:

  import AssemblyKeys._
 name := Kinesis Consumer
 version := 1.0
 organization := com.myconsumer
 scalaVersion := 2.11.5

  libraryDependencies += org.apache.spark %% spark-core % 1.3.0
 % provided
 libraryDependencies += org.apache.spark %% spark-streaming %
 1.3.0 % provided
 libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0 % provided

  assemblySettings
 jarName in assembly :=  consumer-assembly.jar
 assemblyOption in assembly := (assemblyOption in
 assembly).value.copy(includeScala=false)

  Any help appreciated.

  Thanks,
 Vadim

 On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com
 wrote:

  It looks like you're attempting to mix Scala versions, so that's
 going to cause some problems.  If you really want to use Scala 2.11.5, 
 you
 must also use Spark package versions built for Scala 2.11 rather than
 2.10.  Anyway, that's not quite the correct way to specify Scala
 dependencies in build.sbt.  Instead of placing the Scala version after 
 the
 artifactId (like spark-core_2.10), what you actually want is to use 
 just
 spark-core with two percent signs before it.  Using two percent signs
 will make it use the version of Scala that matches your declared
 scalaVersion.  For example:

  libraryDependencies += org.apache.spark %% spark-core %
 1.3.0 % provided

  libraryDependencies += org.apache.spark %% spark-streaming %
 1.3.0 % provided

  libraryDependencies += org.apache.spark %%
 spark-streaming-kinesis-asl % 1.3.0

  I think that may get you a little closer, though I think you're
 probably going to run into the same problems I ran into in this thread:
 https://www.mail-archive.com/user@spark.apache.org/msg23891.html  I
 never really got an answer for that, and I temporarily moved on to other
 things for now.


  ~ Jonathan Kelly

   From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com
 Date: Thursday, April 2, 2015 at 9:53 AM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Spark + Kinesis

   Hi all,

  I am trying to write an Amazon Kinesis consumer Scala app that
 processes data in the
 Kinesis stream. Is this the correct way to specify *build.sbt*:

  ---
 *import AssemblyKeys._*
 *name := Kinesis Consumer*






 *version := 1.0

Re: WordCount example

2015-04-06 Thread Tathagata Das
There are no workers registered with the Spark Standalone master! That is
the crux of the problem. :)
Follow the instructions properly -
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
Especially make the conf/slaves file has intended workers listed.

TD

On Mon, Apr 6, 2015 at 9:55 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 Interesting, I see 0 cores in the UI?


- *Cores:* 0 Total, 0 Used


 On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das t...@databricks.com wrote:

 What does the Spark Standalone UI at port 8080 say about number of cores?

 On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process
 processor   : 0
 processor   : 1
 processor   : 2
 processor   : 3
 processor   : 4
 processor   : 5
 processor   : 6
 processor   : 7

 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com
 wrote:

 How many cores are present in the works allocated to the standalone
 cluster spark://ip-10-241-251-232:7077 ?


 On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077
 this seems to work. I don't understand why though because when I
 give spark://ip-10-241-251-232:7077 application seem to bootstrap
 successfully, just doesn't create a socket on port ?


 On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia 
 mohitanch...@gmail.com wrote:

 I checked the ports using netstat and don't see any connections
 established on that port. Logs show only this:

 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with
 ID app-20150327135048-0002

 Spark ui shows:

 Running Applications
 IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
 app-20150327135048-0002
 http://54.69.225.94:8080/app?appId=app-20150327135048-0002
 NetworkWordCount
 http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 
 MB2015/03/27
 13:50:48ec2-userWAITING33 s
 Code looks like is being executed:

 java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077

 *public* *static* *void* doWork(String masterUrl){

 SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
 NetworkWordCount);

 JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
 Durations.*seconds*(1));

 JavaReceiverInputDStreamString lines = jssc.socketTextStream(
 localhost, );

 System.*out*.println(Successfully created connection);

 *mapAndReduce*(lines);

  jssc.start(); // Start the computation

 jssc.awaitTermination(); // Wait for the computation to terminate

 }

 *public* *static* *void* main(String ...args){

 *doWork*(args[0]);

 }
 And output of the java program after submitting the task:

 java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to:
 ec2-user
 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to:
 ec2-user
 15/03/27 13:50:46 INFO SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(ec2-user); users with modify permissions: Set(ec2-user)
 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
 15/03/27 13:50:46 INFO Remoting: Starting remoting
 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on
 addresses
 :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal
 :60184]
 15/03/27 13:50:47 INFO Utils: Successfully started service
 'sparkDriver' on port 60184.
 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
 /tmp/spark-local-20150327135047-5399
 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity
 3.5 GB
 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
 server' on port 57955.
 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI'
 on port 4040.
 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
 http://ip-10-241-251-232.us-west-2.compute.internal:4040
 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
 spark://ip-10-241-251-232:7077...
 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to
 Spark cluster with app ID app-20150327135048-0002
 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on
 58358
 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register

Re: How to restrict foreach on a streaming RDD only once upon receiver completion

2015-04-06 Thread Tathagata Das
So you want to sort based on the total count of the all the records
received through receiver? In that case, you have to combine all the counts
using updateStateByKey (
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
)
But stepping back, if you want to get the final results at the end of the
receiving all the data (as opposed to continuously), why are you even using
streaming? You could create a custom RDD that reads from ElasticSearch and
then use it in a Spark program. I think that's more natural as your
application is more batch-like than streaming-like as you are using the
results in real-time.

TD

On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty hpoli...@icloud.com wrote:

 I have created a Custom Receiver to fetch records pertaining to a specific
 query from Elastic Search and have implemented Streaming RDD
 transformations to process the data generated by the receiver.

 The final RDD is a sorted list of name value pairs and I want to read the
 top 20 results programmatically rather than write to an external file.
 I use foreach on the RDD and take the top 20 values into a list. I see
 that forEach is processed every time there is a new microbatch from the
 receiver.

 However, I want the foreach computation to be done only once when the
 receiver has finished fetching all the records from Elastic Search and
 before the streaming context is killed so that I can populate the results
 into a list and process it in my driver program.

 Appreciate any guidance in this regard.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Application Stages and DAG

2015-04-03 Thread Tathagata Das
What he meant is that look it up in the Spark UI, specifically in the Stage
tab to see what is taking so long. And yes code snippet helps us debug.

TD

On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You need open the Stage\'s page which is taking time, and see how long its
 spending on GC etc. Also it will be good to post that Stage and its
 previous transformation's code snippet to make us understand it better.

 Thanks
 Best Regards

 On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri vijay.innam...@gmail.com
 wrote:


 When I run the Spark application (streaming) in local mode I could see
 the execution progress as below..

 [Stage
 0:
 (1817 + 1) / 3125]
 
 [Stage
 2:===
 (740 + 1) / 3125]

 One of the stages is taking long time for execution.

 How to find the transformations/ actions associated with a particular
 stage?
 Is there anyway to find the execution DAG of a Spark Application?

 Regards
 Vijay





Re: About Waiting batches on the spark streaming UI

2015-04-03 Thread Tathagata Das
Very good question! This is because the current code is written such that
the ui considers a batch as waiting only when it has actually started being
processed. Thats batched waiting in the job queue is not considered in the
calculation. It is arguable that it may be more intuitive to count that in
the waiting as well.
On Apr 3, 2015 12:59 AM, bit1...@163.com bit1...@163.com wrote:


 I copied the following from the spark streaming UI, I don't know why the
 Waiting batches is 1, my understanding is that it should be 72.
 Following  is my understanding:
 1. Total time is 1minute 35 seconds=95 seconds
 2. Batch interval is 1 second, so, 95 batches are generated in 95 seconds.
 3. Processed batches are 23(Correct, because in my processing code, it
 does nothing but sleep 4 seconds)
 4. Then the waiting batches should be 95-23=72



- *Started at: * Fri Apr 03 15:17:47 CST 2015
- *Time since start: *1 minute 35 seconds
- *Network receivers: *1
- *Batch interval: *1 second
- *Processed batches: *23
- *Waiting batches: *1
- *Received records: *0
- *Processed records: *0


 --
 bit1...@163.com



Re: About Waiting batches on the spark streaming UI

2015-04-03 Thread Tathagata Das
Maybe that should be marked as waiting as well. Will keep that in mind. We
plan to update the ui soon, so will keep that in mind.
On Apr 3, 2015 10:12 AM, Ted Yu yuzhih...@gmail.com wrote:

 Maybe add another stat for batches waiting in the job queue ?

 Cheers

 On Fri, Apr 3, 2015 at 10:01 AM, Tathagata Das t...@databricks.com
 wrote:

 Very good question! This is because the current code is written such that
 the ui considers a batch as waiting only when it has actually started being
 processed. Thats batched waiting in the job queue is not considered in the
 calculation. It is arguable that it may be more intuitive to count that in
 the waiting as well.
 On Apr 3, 2015 12:59 AM, bit1...@163.com bit1...@163.com wrote:


 I copied the following from the spark streaming UI, I don't know why the
 Waiting batches is 1, my understanding is that it should be 72.
 Following  is my understanding:
 1. Total time is 1minute 35 seconds=95 seconds
 2. Batch interval is 1 second, so, 95 batches are generated in 95
 seconds.
 3. Processed batches are 23(Correct, because in my processing code, it
 does nothing but sleep 4 seconds)
 4. Then the waiting batches should be 95-23=72



- *Started at: * Fri Apr 03 15:17:47 CST 2015
- *Time since start: *1 minute 35 seconds
- *Network receivers: *1
- *Batch interval: *1 second
- *Processed batches: *23
- *Waiting batches: *1
- *Received records: *0
- *Processed records: *0


 --
 bit1...@163.com





Re: WordCount example

2015-04-03 Thread Tathagata Das
What does the Spark Standalone UI at port 8080 say about number of cores?

On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process
 processor   : 0
 processor   : 1
 processor   : 2
 processor   : 3
 processor   : 4
 processor   : 5
 processor   : 6
 processor   : 7

 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote:

 How many cores are present in the works allocated to the standalone
 cluster spark://ip-10-241-251-232:7077 ?


 On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this
 seems to work. I don't understand why though because when I
 give spark://ip-10-241-251-232:7077 application seem to bootstrap
 successfully, just doesn't create a socket on port ?


 On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 I checked the ports using netstat and don't see any connections
 established on that port. Logs show only this:

 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount
 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID
 app-20150327135048-0002

 Spark ui shows:

 Running Applications
 IDNameCoresMemory per NodeSubmitted TimeUserStateDuration
 app-20150327135048-0002
 http://54.69.225.94:8080/app?appId=app-20150327135048-0002
 NetworkWordCount
 http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 
 MB2015/03/27
 13:50:48ec2-userWAITING33 s
 Code looks like is being executed:

 java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077

 *public* *static* *void* doWork(String masterUrl){

 SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName(
 NetworkWordCount);

 JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf,
 Durations.*seconds*(1));

 JavaReceiverInputDStreamString lines = jssc.socketTextStream(
 localhost, );

 System.*out*.println(Successfully created connection);

 *mapAndReduce*(lines);

  jssc.start(); // Start the computation

 jssc.awaitTermination(); // Wait for the computation to terminate

 }

 *public* *static* *void* main(String ...args){

 *doWork*(args[0]);

 }
 And output of the java program after submitting the task:

 java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user
 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to:
 ec2-user
 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(ec2-user);
 users with modify permissions: Set(ec2-user)
 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started
 15/03/27 13:50:46 INFO Remoting: Starting remoting
 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on
 addresses
 :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal
 :60184]
 15/03/27 13:50:47 INFO Utils: Successfully started service
 'sparkDriver' on port 60184.
 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker
 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster
 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at
 /tmp/spark-local-20150327135047-5399
 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity
 3.5 GB
 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b
 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server
 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file
 server' on port 57955.
 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on
 port 4040.
 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at
 http://ip-10-241-251-232.us-west-2.compute.internal:4040
 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master
 spark://ip-10-241-251-232:7077...
 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark
 cluster with app ID app-20150327135048-0002
 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on
 58358
 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
 BlockManager
 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block
 manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM,
 BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal,
 58358)
 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager
 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.0
 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started
 15

<    1   2   3   4   5   6   7   8   9   >