Re: jars are not loading from 1.3. those set via setJars to the SparkContext
Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Re: Serial batching with Spark Streaming
No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote: Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 19 June 2015 at 16:58, Tathagata Das t...@databricks.com wrote: I see what is the problem. You are adding sleep in the transform operation. The transform function is called at the time of preparing the Spark jobs for a batch. It should not be running any time consuming operation like a RDD action or a sleep. Since this operation needs to run every batch interval, doing blocking long running operation messes with the need to run every batch interval. I will try to make this clearer in the guide. I had not seen anyone do something like this before and therefore it did not occur to me that this could happen. As long as you dont do time consuming blocking operation in the transform function, the batches will be generated, scheduled and executed in serial order by default. On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia mici...@gmail.com wrote: Binh, thank you very much for your comment and code. Please could you outline an example use of your stream? I am a newbie to Spark. Thanks again! On 18 June 2015 at 14:29, Binh Nguyen Van binhn...@gmail.com wrote: I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) = { val jobFunc = () = { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None = None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) = Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote: Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521
Re: RE: Spark or Storm
I agree with Cody. Its pretty hard for any framework to provide in built support for that since the semantics completely depends on what data store you want to use it with. Providing interfaces does help a little, but even with those interface, the user still has to do most of the heavy lifting; the user has to understand what is actually going on AND implement all the needed code to ensure unique ID, and the data are atomically updated, according to the capability and APIs provided by the data store. On Fri, Jun 19, 2015 at 7:45 AM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics semantics of output operations section Is this really not clear? As for the general tone of why doesn't the framework do it for you... in my opinion, this is essential complexity for delivery semantics in a distributed system, not incidental complexity. You need to actually understand and be responsible for what's going on, unless you're talking about very narrow use cases (i.e. outputting to a known datastore with known semantics and schema) On Fri, Jun 19, 2015 at 7:26 AM, Ashish Soni asoni.le...@gmail.com wrote: My understanding for exactly once semantics is it is handled into the framework itself but it is not very clear from the documentation , I believe documentation needs to be updated with a simple example so that it is clear to the end user , This is very critical to decide when some one is evaluating the framework and does not have enough time to validate all the use cases but to relay on the documentation. Ashish On Fri, Jun 19, 2015 at 7:10 AM, bit1...@163.com bit1...@163.com wrote: I think your observation is correct, you have to take care of these replayed data at your end,eg,each message has a unique id or something else. I am using I think in the above sentense, because I am not sure and I also have a related question: I am wonderring how direct stream + kakfa is implemented when the Driver is down and restarted, will it always first replay the checkpointed failed batch or will it honor Kafka's offset reset policy(auto.offset.reset). If it honors the reset policy and it is set as smallest, then it is the at least once semantics; if it set largest, then it will be at most once semantics? -- bit1...@163.com *From:* Haopu Wang hw...@qilinsoft.com *Date:* 2015-06-19 18:47 *To:* Enno Shioji eshi...@gmail.com; Tathagata Das t...@databricks.com *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org; bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs wrbri...@gmail.com; Ashish Soni asoni.le...@gmail.com; ayan guha guha.a...@gmail.com; user@spark.apache.org; Sateesh Kavuri sateesh.kav...@gmail.com; Spark Enthusiast sparkenthusi...@yahoo.in; Sabarish Sasidharan sabarish.sasidha...@manthan.com *Subject:* RE: RE: Spark or Storm My question is not directly related: about the exactly-once semantic, the document (copied below) said spark streaming gives exactly-once semantic, but actually from my test result, with check-point enabled, the application always re-process the files in last batch after gracefully restart. == *Semantics of Received Data* Different input sources provide different guarantees, ranging from *at-least once* to *exactly once*. Read for more details. *With Files* If all of the input data is already present in a fault-tolerant files system like HDFS, Spark Streaming can always recover from any failure and process all the data. This gives *exactly-once* semantics, that all the data will be processed exactly once no matter what fails. -- *From:* Enno Shioji [mailto:eshi...@gmail.com] *Sent:* Friday, June 19, 2015 5:29 PM *To:* Tathagata Das *Cc:* prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com; Jordan Pilat; Will Briggs; Ashish Soni; ayan guha; user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan *Subject:* Re: RE: Spark or Storm Fair enough, on second thought, just saying that it should be idempotent is indeed more confusing. I guess the crux of the confusion comes from the fact that people tend to assume the work you described (store batch id and skip etc.) is handled by the framework, perhaps partly because Storm Trident does handle it (you just need to let Storm know if the output operation has succeeded or not, and it handles the batch id storing skipping business). Whenever I explain people that one needs to do this additional work you described to get end-to-end exactly-once semantics, it usually takes a while to convince them. In my limited experience, they tend to interpret transactional in that sentence to mean that you just have to write to a transactional storage like ACID RDB. Pointing them to Semantics of output operations is usually sufficient though. Maybe others like
Re: createDirectStream and Stats
Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: Assigning number of workers in spark streaming
All the basic parameter applies to both client and cluster mode. The only difference between client and cluster mode is that the driver will be run in the cluster, and there are some *additional* parameters to configure that. Other params are common. Isnt it clear from the docs? On Fri, Jun 19, 2015 at 2:54 PM, anshu shukla anshushuk...@gmail.com wrote: Thanx alot ! But in client mode Can we assign number of workers/nodes as a flag parameter to the spark-Submit command . And by default how it will distribute the load across the nodes. # Run on a Spark Standalone cluster in client deploy mode ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 On Sat, Jun 20, 2015 at 3:18 AM, Tathagata Das t...@databricks.com wrote: Depends on what cluster manager are you using. Its all pretty well documented in the online documentation. http://spark.apache.org/docs/latest/submitting-applications.html On Fri, Jun 19, 2015 at 2:29 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , *[For Client Mode]* 1- Is there any way to assign the number of workers from a cluster should be used for particular application . 2- If not then how spark scheduler decides scheduling of diif applications inside one full logic . say my logic have {inputStream wordsplitter-wordcountstatistical analysis} then on how many workers it will be scheduled . -- Thanks Regards, Anshu Shukla SERC-IISC -- Thanks Regards, Anshu Shukla
Re: Serial batching with Spark Streaming
); final JavaDStreamString lines = context.union( context.receiverStream(new GeneratorReceiver()), ImmutableList.of( context.receiverStream(new GeneratorReceiver()), context.receiverStream(new GeneratorReceiver(; lines.print(); final AccumulatorInteger lineRddIndex = context.sparkContext().accumulator(0); lines.foreachRDD( rdd - { lineRddIndex.add(1); final String prefix = /tmp/ + String.format(%03d, lineRddIndex.localValue()) + _lines_; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s - out.println(s)); } return null; }); final JavaDStreamString words = lines.flatMap(x - Arrays.asList(x.split( ))); final JavaPairDStreamString, Integer pairs = words.mapToPair(s - new Tuple2String, Integer(s, 1)); final JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey((i1, i2) - i1 + i2); final AccumulatorInteger sleep = context.sparkContext().accumulator(0); final JavaPairDStreamString, Integer wordCounts2 = JavaPairDStream.fromJavaDStream( wordCounts.transform( (rdd) - { sleep.add(1); Thread.sleep(sleep.localValue() 6 ? 2 : 5000); return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag()); })); final Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = (values, state) - { Integer newSum = state.or(0); for (final Integer value : values) { newSum += value; } return Optional.of(newSum); }; final ListTuple2String, Integer tuples = ImmutableList.Tuple2String, Integer of(); final JavaPairRDDString, Integer initialRDD = context.sparkContext().parallelizePairs(tuples); final JavaPairDStreamString, Integer wordCountsState = wordCounts2.updateStateByKey( updateFunction, new HashPartitioner(context.sparkContext().defaultParallelism()), initialRDD); wordCountsState.print(); final AccumulatorInteger rddIndex = context.sparkContext().accumulator(0); wordCountsState.foreachRDD( rdd - { rddIndex.add(1); final String prefix = /tmp/ + String.format(%03d, rddIndex.localValue()) + _words_; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s - out.println(s)); } return null; }); context.start(); context.awaitTermination(); } On 17 June 2015 at 17:25, Tathagata Das t...@databricks.com wrote: The default behavior should be that batch X + 1 starts processing only after batch X completes. If you are using Spark 1.4.0, could you show us a screenshot of the streaming tab, especially the list of batches? And could you also tell us if you are setting any SparkConf configurations? On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia mici...@gmail.com wrote: Is it possible to achieve serial batching with Spark Streaming? Example: I configure the Streaming Context for creating a batch every 3 seconds. Processing of the batch #2 takes longer than 3 seconds and creates a backlog of batches: batch #1 takes 2s batch #2 takes 10s batch #3 takes 2s batch #4 takes 2s Whet testing locally, it seems that processing of multiple batches is finished at the same time: batch #1 finished at 2s batch #2 finished at 12s batch #3 finished at 12s (processed in parallel) batch #4 finished at 15s How can I delay processing of the next batch, so that is processed only after processing of the previous batch has been completed? batch #1 finished at 2s batch #2 finished at 12s batch #3 finished at 14s (processed serially) batch #4 finished at 16s I want to perform a transformation for every key only once in a given period of time (e.g. batch duration). I find all unique keys in a batch and perform the transformation on each key. To ensure that the transformation is done for every key only once, only one batch can be processed at a time. At the same time, I want that single batch to be processed in parallel. context = new JavaStreamingContext(conf, Durations.seconds(10)); stream = context.receiverStream(...); stream .reduceByKey(...) .transform(...) .foreachRDD(output); Any ideas or pointers are very welcome. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Assigning number of workers in spark streaming
Depends on what cluster manager are you using. Its all pretty well documented in the online documentation. http://spark.apache.org/docs/latest/submitting-applications.html On Fri, Jun 19, 2015 at 2:29 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , *[For Client Mode]* 1- Is there any way to assign the number of workers from a cluster should be used for particular application . 2- If not then how spark scheduler decides scheduling of diif applications inside one full logic . say my logic have {inputStream wordsplitter-wordcountstatistical analysis} then on how many workers it will be scheduled . -- Thanks Regards, Anshu Shukla SERC-IISC
Re: createDirectStream and Stats
I dont think there was any enhancments that can change this behavior. On Fri, Jun 19, 2015 at 6:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: RE: Spark or Storm
If the current documentation is confusing, we can definitely improve the documentation. However, I dont not understand why is the term transactional confusing. If your output operation has to add 5, then the user has to implement the following mechanism 1. If the unique id of the batch of data is already present in the store, then skip the update 2. Otherwise atomically do both, the update operation as well as store the unique id of the batch. This is pretty much the definition of a transaction. The user has to be aware of the transactional semantics of the data store while implementing this functionality. You CAN argue that this effective makes the whole updating sort-a idempotent, as even if you try doing it multiple times, it will update only once. But that is not what is generally considered as idempotent. Writing a fixed count, not an increment, is usually what is called idempotent. And so just mentioning that the output operation must be idempotent is, in my opinion, more confusing. To take a page out of the Storm / Trident guide, even they call this exact conditional updating of Trident State as transactional operation. See transactional spout in the Trident State guide - https://storm.apache.org/documentation/Trident-state In the end, I am totally open the suggestions and PRs on how to make the programming guide easier to understand. :) TD On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji eshi...@gmail.com wrote: Tbh I find the doc around this a bit confusing. If it says end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or transactional), I think most people will interpret it that as long as you use a storage which has atomicity (like MySQL/Postgres etc.), a successful output operation for a given batch (let's say + 5) is going to be issued exactly-once against the storage. However, as I understand it that's not what this statement means. What it is saying is, it will always issue +5 and never, say +6, because it makes sure a message is processed exactly-once internally. However, it *may* issue +5 more than once for a given batch, and it is up to the developer to deal with this by either making the output operation idempotent (e.g. set 5), or transactional (e.g. keep track of batch IDs and skip already applied batches etc.). I wonder if it makes more sense to drop or transactional from the statement, because if you think about it, ultimately what you are asked to do is to make the writes idempotent even with the transactional approach, transactional is a bit loaded and would be prone to lead to misunderstandings (even though in fairness, if you read the fault tolerance chapter it explicitly explains it). On Fri, Jun 19, 2015 at 2:56 AM, prajod.vettiyat...@wipro.com wrote: More details on the Direct API of Spark 1.3 is at the databricks blog: https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html Note the use of checkpoints to persist the Kafka offsets in Spark Streaming itself, and not in zookeeper. Also this statement:”.. This allows one to build a Spark Streaming + Kafka pipelines with end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or transactional).” *From:* Cody Koeninger [mailto:c...@koeninger.org] *Sent:* 18 June 2015 19:38 *To:* bit1...@163.com *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com; eshi...@gmail.com; wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha; user; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.com *Subject:* Re: RE: Spark or Storm That general description is accurate, but not really a specific issue of the direct steam. It applies to anything consuming from kafka (or, as Matei already said, any streaming system really). You can't have exactly once semantics, unless you know something more about how you're storing results. For some unique id, topicpartition and offset is usually the obvious choice, which is why it's important that the direct stream gives you access to the offsets. See https://github.com/koeninger/kafka-exactly-once for more info On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com bit1...@163.com wrote: I am wondering how direct stream api ensures end-to-end exactly once semantics I think there are two things involved: 1. From the spark streaming end, the driver will replay the Offset range when it's down and restarted,which means that the new tasks will process some already processed data. 2. From the user end, since tasks may process already processed data, user end should detect that some data has already been processed,eg, use some unique ID. Not sure if I have understood correctly. -- bit1...@163.com *From:* prajod.vettiyat...@wipro.com *Date:* 2015-06-18 16:56 *To:* jrpi...@gmail.com; eshi...@gmail.com *CC:* wrbri...@gmail.com;
Re: understanding on the waiting batches and scheduling delay in Streaming UI
Also, could you give a screenshot of the streaming UI. Even better, could you run it on Spark 1.4 which has a new streaming UI and then use that for debugging/screenshot? TD On Thu, Jun 18, 2015 at 3:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Which version of spark? and what is your data source? For some reason, your processing delay is exceeding the batch duration. And its strange that you are not seeing any scheduling delay. Thanks Best Regards On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.com wrote: Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling delay” is 0. I am a bit confused. If the “waiting batches” is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
Re: Latency between the RDD in Streaming
Why do you need to uniquely identify the message? All you need is the time when the message was inserted by the receiver, and when it is processed, isnt it? On Thu, Jun 18, 2015 at 2:28 PM, anshu shukla anshushuk...@gmail.com wrote: Thanks alot , But i have already tried the second way ,Problem with that is that how to identify the particular RDD from source to sink (as we can do by passing a msg id in storm) . For that i just updated RDD and added a msgID (as static variable) . but while dumping them to file some of the tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500 tuples/sec). On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com wrote: Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: createDirectStream and Stats
Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Glad to hear that. :) On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, We switched from ParallelGC to CMS, and the symptom is gone. On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: Could you set spark.shuffle.io.preferDirectBufs to false to turn off the off-heap allocation of netty? Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com: Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter
Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD
I think you may be including a different version of Spark Streaming in your assembly. Please mark spark-core nd spark-streaming as provided dependencies. Any installation of Spark will automatically provide Spark in the classpath so you do not have to bundle it. On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have the following piece of code, where I am trying to transform a spark stream and add min and max to it of eachRDD. However, I get an error saying max call does not exist, at run-time (compiles properly). I am using spark-1.4 I have added the question to stackoverflow as well: http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796 Any help is greatly appreciated :) Thanks Nipun JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2()); sortedtsStream.foreach( new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long, Void() { @Override public Void call(JavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long templist = tuple2Tuple3JavaPairRDD.collect(); for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long tuple :templist){ Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2._1(); Date maxDate = new Date(tuple._2._2()); Date minDate = new Date(tuple._2._2()); System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count + Max: + maxDate.toString() + Min: + minDate.toString()); } return null; } } ); Error: 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread JobGenerator java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2; at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346) at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf
Re: Latency between the RDD in Streaming
Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla
Re: Latency between the RDD in Streaming
Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Serial batching with Spark Streaming
The default behavior should be that batch X + 1 starts processing only after batch X completes. If you are using Spark 1.4.0, could you show us a screenshot of the streaming tab, especially the list of batches? And could you also tell us if you are setting any SparkConf configurations? On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia mici...@gmail.com wrote: Is it possible to achieve serial batching with Spark Streaming? Example: I configure the Streaming Context for creating a batch every 3 seconds. Processing of the batch #2 takes longer than 3 seconds and creates a backlog of batches: batch #1 takes 2s batch #2 takes 10s batch #3 takes 2s batch #4 takes 2s Whet testing locally, it seems that processing of multiple batches is finished at the same time: batch #1 finished at 2s batch #2 finished at 12s batch #3 finished at 12s (processed in parallel) batch #4 finished at 15s How can I delay processing of the next batch, so that is processed only after processing of the previous batch has been completed? batch #1 finished at 2s batch #2 finished at 12s batch #3 finished at 14s (processed serially) batch #4 finished at 16s I want to perform a transformation for every key only once in a given period of time (e.g. batch duration). I find all unique keys in a batch and perform the transformation on each key. To ensure that the transformation is done for every key only once, only one batch can be processed at a time. At the same time, I want that single batch to be processed in parallel. context = new JavaStreamingContext(conf, Durations.seconds(10)); stream = context.receiverStream(...); stream .reduceByKey(...) .transform(...) .foreachRDD(output); Any ideas or pointers are very welcome. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark or Storm
To add more information beyond what Matei said and answer the original question, here are other things to consider when comparing between Spark Streaming and Storm. * Unified programming model and semantics - Most occasions you have to process the same data again in batch jobs. If you have two separate systems for batch and streaming, its much much harder to share the code. You will have to deal with different processing models, with their own semantics. Compare Storm's join vs doing an usual batch join, where as Spark and Spark Streaming share the same join semantics as they are based on same RDD model underneath. * Integration with Spark ecosystem - Many people really want to go beyond basic streaming ETL and into advanced streaming analytics. - Combine stream processing with static datasets - Apply dynamically updated machine learning models (i.e. offline learning and online prediction, or even continuous learning and prediction), - Apply DataFrame and SQL operation with streaming These things are pretty easy to do with the spark ecosystem * Operational management - You have to consider the operational cost of managing two separate systems for batch and stream processing (with their own deployment models), vs managing one single engine with one deployment model. * Performance - According to Intel's independent study, Spark Streaming in Kafka direct mode can have 2.5-3x throughput than Trident with 0.5GB batch size. And at that batch size, the latency of Trident is 30 seconds, compared to 1.5 seconds for Spark Streaming. This is from a talk that Intel gave in the Spark Summit (https://spark-summit.org/2015/) two days ago. Slides will be available soon, but here is a sneak peek - throughput - http://i.imgur.com/u6pf4rB.png and latency - http://imgur.com/c46MJ4i I will post the link to the slides when it comes out, hopefully next week. On Wed, Jun 17, 2015 at 11:55 AM, Matei Zaharia matei.zaha...@gmail.com wrote: The major difference is that in Spark Streaming, there's no *need* for a TridentState for state inside your computation. All the stateful operations (reduceByWindow, updateStateByKey, etc) automatically handle exactly-once processing, keeping updates in order, etc. Also, you don't need to run a separate transactional system (e.g. MySQL) to store the state. After your computation runs, if you want to write the final results (e.g. the counts you've been tracking) to a storage system, you use one of the output operations (saveAsFiles, foreach, etc). Those actually will run in order, but some might run multiple times if nodes fail, thus attempting to write the same state again. You can read about how it works in this research paper: http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf. Matei On Jun 17, 2015, at 11:49 AM, Enno Shioji eshi...@gmail.com wrote: Hi Matei, Ah, can't get more accurate than from the horse's mouth... If you don't mind helping me understand it correctly.. From what I understand, Storm Trident does the following (when used with Kafka): 1) Sit on Kafka Spout and create batches 2) Assign global sequential ID to the batches 3) Make sure that all result of processed batches are written once to TridentState, *in order* (for example, by skipping batches that were already applied once, ultimately by using Zookeeper) TridentState is an interface that you have to implement, and the underlying storage has to be transactional for this to work. The necessary skipping etc. is handled by Storm. In case of Spark Streaming, I understand that 1) There is no global ordering; e.g. an output operation for batch consisting of offset [4,5,6] can be invoked before the operation for offset [1,2,3] 2) If you wanted to achieve something similar to what TridentState does, you'll have to do it yourself (for example using Zookeeper) Is this a correct understanding? On Wed, Jun 17, 2015 at 7:14 PM, Matei Zaharia matei.zaha...@gmail.com wrote: This documentation is only for writes to an external system, but all the counting you do within your streaming app (e.g. if you use reduceByKeyAndWindow to keep track of a running count) is exactly-once. When you write to a storage system, no matter which streaming framework you use, you'll have to make sure the writes are idempotent, because the storage system can't know whether you meant to write the same data again or not. But the place where Spark Streaming helps over Storm, etc is for tracking state within your computation. Without that facility, you'd not only have to make sure that writes are idempotent, but you'd have to make sure that updates to your own internal state (e.g. reduceByKeyAndWindow) are exactly-once too. Matei On Jun 17, 2015, at 8:26 AM, Enno Shioji eshi...@gmail.com wrote: The thing is, even with that improvement, you still have to make updates idempotent or transactional yourself. If you read
Re: Spark Streaming reads from stdin or output from command line utility
Is it a lot of data that is expected to come through stdin? I mean is it even worth parallelizing the computation using something like Spark Streaming? On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote: Thanks for your reply! In my use case, it would be stream from only one stdin. Also I'm working with Scala. It would be great if you could talk about multi stdin case as well! Thanks. From: Tathagata Das t...@databricks.com Date: Thursday, June 11, 2015 at 8:11 PM To: Heath Guo heath...@fb.com Cc: user user@spark.apache.org Subject: Re: Spark Streaming reads from stdin or output from command line utility Are you going to receive data from one stdin from one machine, or many stdins on many machines? On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote: Hi, I'm new to Spark Streaming, and I want to create a application where Spark Streaming could create DStream from stdin. Basically I have a command line utility that generates stream data, and I'd like to pipe data into DStream. What's the best way to do that? I thought rdd.pipe() could help, but it seems that requires an rdd in the first place, which does not apply. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: How to keep a SQLContext instance alive in a spark streaming application's life cycle?
BTW, in Spark 1.4 announced today, I added SQLContext.getOrCreate. So you dont need to create the singleton yourself. On Wed, Jun 10, 2015 at 3:21 AM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Note: CCing user@spark.apache.org First, you must check if the RDD is empty: messages.foreachRDD { rdd = if (!rdd.isEmpty) { }} Now, you can obtain the instance of a SQLContext: val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) *Optional* In this moment, I like work with DataFrame. I convert RDD to DataFrame. I see that you recive a JSON: val df :DataFrame = sqlContext.jsonRDD(message, getSchema(getSchemaStr)).toDF() My getSchema function create a Schema of my JSON: def getSchemaStr() :String = feature1 feature2 ... def getSchema(schema: String) :StructType = StructType (schema.split( ).map(fieldName = StructField(fieldName, StringType, true))) I hope you helps. Regards. 2015-06-09 17:36 GMT+02:00 codingforfun [via Apache Spark User List] ml-node+s1001560n23226...@n3.nabble.com: I don't know why, you said “Why? I tried this solution and works fine.” means your SQLContext instance alive all the streaming application’s life time, rather than one bath duration ? My code as below: object SQLContextSingleton extends java.io.Serializable{ @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } // type_-typex, id_-id, url_-url case class (time: Timestamp, id: Int, openfrom: Int, tab: Int) extends Serializable case class Count(x: Int) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val kafkaParams = Map(metadata.broker.list - 10.20.30.40:9092,) @transient val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) @transient val dddstream= newsIdDStream.map(x = x._2).flatMap(x = x.split(\n)) dddstream.foreachRDD { rdd = SQLContextSingleton.getInstance(rdd.sparkContext).jsonRDD(rdd).registerTempTable(ttable) val ret = SQLContextSingleton.getInstance(rdd.sparkContext).sql(SELECT COUNT(*) FROM ttable) ret.foreach{ x = println(x(0)) } } ssc.start() ssc.awaitTermination() 在 2015-06-09 17:41:44,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23226i=0 写道: Why? I tried this solution and works fine. El martes, 9 de junio de 2015, codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23219i=0 escribió: Hi drarse, thanks for replying, the way you said use a singleton object does not work 在 2015-06-09 16:24:25,drarse [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23218i=0 写道: The best way is create a singleton object like: object SQLContextSingleton { @transient private var instance: SQLContext = null // Instantiate SQLContext on demand def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance }} You have more information in the programming guide: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 2015-06-09 9:27 GMT+02:00 codingforfun [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=23216i=0: I used SQLContext in a spark streaming application as blew: case class topic_name (f1: Int, f2: Int) val sqlContext = new SQLContext(sc) @transient val ssc = new StreamingContext(sc, new Duration(5 * 1000)) ssc.checkpoint(.) val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic_name)) theDStream.map(x = x._2).foreach { rdd = sqlContext.jsonRDD(newsIdRDD).registerTempTable(topic_name) sqlContext.sql(select count(*) from topic_name).foreach { x = WriteToFile(file_path, x(0).toString) } } ssc.start() ssc.awaitTermination() I found i could only get every 5 seconds's count of message, because The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame, i guess every 5 seconds, a new sqlContext will be create and the temporary table can only alive just 5 seconds, i want to the sqlContext and the temporary table alive all the streaming application's life cycle, how to do it? Thanks~ -- If you reply to
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Let me try to add some clarity in the different thought directions that's going on in this thread. 1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES? If there are not rate limits set up, the most reliable way to detect whether the current Spark cluster is being insufficient to handle the data load is to use the StreamingListner interface which gives all the information about when batches start and end. See the internal implementation of the StreamingListener called StreamingJobProgressListener. This is the one that drives the streaming UI. You can get the scheduling delay (time take for a batch to start processing) from it and use that as a reliable indicator that Spark Streaming is not able to process as fast as data is being received. But if you have already set rate limits based on the max load that cluster can handle, then you will probably never detect that the actual input rate into Kafka has gone up and data is getting buffered inside Kafka. In that case, you have to monitor kafka load to correctly detect the high load. You may to use a combination of both techniques for robust and safe elastic solution - Have rate limits set, use StreamingListener for early detect that processing load is increasing (can increase without actual increase in data rate) and also make sure from Kafka monitoring that the whole end-to-end system is keeping up. 2. HOW TO GET MORE CLUSTER RESOURCES? Currently for YARN, you can use the developer API of dynamic allocation that Andrew Or has introduced to ask for more executors from YARN. Note that the existing dynamic allocation solution is unlikely to work for streaming, and should not be used. Rather I recommend building your own logic that sees the streaming scheduling delay, and accordingly uses the low level developer API to directly ask for more executors (sparkContext.requestExecutors). In other approaches, the Provising Component idea can also work. 3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES? There are two approaches depending on receiver vs Kafka direct. I am assuming the number of topic partitions pre-determined to be large enough to handle peak load. (a) Kafka Direct: This is the simpler scenario. Since there are no receivers, if the cluster gets a new executor, it will automatically start getting used to run tasks, including reading from Kafka (remember, Kafka direct approach reads from Kafka like a file system, from any node that runs the task). So it will immediately start using the extra resources, no need to do anything further. (b) Receiver: This is definitely tricky. If you dont need to increase the number of receivers, then a new executor will start getting used for computations (shuffles, writing out, etc.), but the parallelism in receiving will not increase. If you need to increase that, then its best to shutdown the context gracefully (so that no data is lost), and a new StreamingContext can be started with more receivers (# receivers = # executors), and may be more #partitions for shuffles. You have call stop on currently running streaming context, to start a new one. If a context is stopped, any thread stuck in awaitTermniation will get unblocked. Does that clarify things? On Thu, Jun 11, 2015 at 7:30 AM, Cody Koeninger c...@koeninger.org wrote: Depends on what you're reusing multiple times (if anything). Read http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: At which point would I call cache()? I just want the runtime to spill to disk when necessary without me having to know when the necessary is. On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger c...@koeninger.org wrote: direct stream isn't a receiver, it isn't required to cache data anywhere unless you want it to. If you want it, just call cache. On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: set the storage policy for the DStream RDDs to MEMORY AND DISK - it appears the storage level can be specified in the createStream methods but not createDirectStream... On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark
Re: Spark Streaming reads from stdin or output from command line utility
Are you going to receive data from one stdin from one machine, or many stdins on many machines? On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote: Hi, I'm new to Spark Streaming, and I want to create a application where Spark Streaming could create DStream from stdin. Basically I have a command line utility that generates stream data, and I'd like to pipe data into DStream. What's the best way to do that? I thought rdd.pipe() could help, but it seems that requires an rdd in the first place, which does not apply. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shutdown with streaming driver running in cluster broke master web UI permanently
Do you have the event logging enabled? TD On Thu, Jun 11, 2015 at 11:24 AM, scar0909 scar0...@gmail.com wrote: I have the same problem. i realized that the master spark becomes unresponsive when we kill the leader zookeeper (of course i assigned the leader election task to the zookeeper). please let me know if you have any devlepments. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shutdown-with-streaming-driver-running-in-cluster-broke-master-web-UI-permanently-tp4149p23284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Join between DStream and Periodically-Changing-RDD
Another approach not mentioned is to use a function to get the RDD that is to be joined. Something like this. Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val rdd = getOrUpdateRDD(params...) rdd.join(kvFile) }) The getOrUpdateRDD() function that you implement will get called every batch interval. And you can decide to return the same RDD or an updated RDD when you want to. Once updated, if the RDD is going to be used in multiple batch intervals, you should cache it. Furthermore, if you are going to join it, you should partition it by a partitioner, then cached it and make sure that the same partitioner is used for joining. That would be more efficient, as the RDD will stay partitioned in memory, minimizing the cost of join. On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote: It depends on how big the Batch RDD requiring reloading is Reloading it for EVERY single DStream RDD would slow down the stream processing inline with the total time required to reload the Batch RDD ….. But if the Batch RDD is not that big then that might not be an issues especially in the context of the latency requirements for your streaming app Another more efficient and real-time approach may be to represent your Batch RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark streaming app instance and keep joining with the actual Dstream RDDs You can feed your HDFS file into a Message Broker topic and consume it from there in the form of DStream RDDs which you keep aggregating over the lifetime of the spark streaming app instance *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Wednesday, June 10, 2015 8:36 AM *To:* Ilove Data *Cc:* user@spark.apache.org *Subject:* Re: Join between DStream and Periodically-Changing-RDD RDD's are immutable, why not join two DStreams? Not sure, but you can try something like this also: kvDstream.foreachRDD(rdd = { val file = ssc.sparkContext.textFile(/sigmoid/) val kvFile = file.map(x = (x.split(,)(0), x)) rdd.join(kvFile) }) Thanks Best Regards On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote: Hi, I'm trying to join DStream with interval let say 20s, join with RDD loaded from HDFS folder which is changing periodically, let say new file is coming to the folder for every 10 minutes. How should it be done, considering the HDFS files in the folder is periodically changing/adding new files? Do RDD automatically detect changes in HDFS folder as RDD source and automatically reload RDD? Thanks! Rendy
Re: Spark SQL and Streaming Results
You could take at RDD *async operations, their source code. May be that can help if getting some early results. TD On Fri, Jun 5, 2015 at 8:41 AM, Pietro Gentile pietro.gentile89.develo...@gmail.com wrote: Hi all, what is the best way to perform Spark SQL queries and obtain the result tuplas in a stremaing way. In particullar, I want to aggregate data and obtain the first and incomplete results in a fast way. But it should be updated until the aggregation be completed. Best Regards.
Re: Spark SQL and Streaming Results
I am not sure. Saisai may be able to say more about it. TD On Fri, Jun 5, 2015 at 5:35 PM, Todd Nist tsind...@gmail.com wrote: There use to be a project, StreamSQL ( https://github.com/thunderain-project/StreamSQL), but it appears a bit dated and I do not see it in the Spark repo, but may have missed it. @TD Is this project still active? I'm not sure what the status is but it may provide some insights on how to achieve what your looking to do. On Fri, Jun 5, 2015 at 6:34 PM, Tathagata Das t...@databricks.com wrote: You could take at RDD *async operations, their source code. May be that can help if getting some early results. TD On Fri, Jun 5, 2015 at 8:41 AM, Pietro Gentile pietro.gentile89.develo...@gmail.com wrote: Hi all, what is the best way to perform Spark SQL queries and obtain the result tuplas in a stremaing way. In particullar, I want to aggregate data and obtain the first and incomplete results in a fast way. But it should be updated until the aggregation be completed. Best Regards.
Re: Roadmap for Spark with Kafka on Scala 2.11?
But compile scope is supposed to be added to the assembly. https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope On Thu, Jun 4, 2015 at 1:24 PM, algermissen1971 algermissen1...@icloud.com wrote: Hi Iulian, On 26 May 2015, at 13:04, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Tue, May 26, 2015 at 10:09 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am setting up a project that requires Kafka support and I wonder what the roadmap is for Scala 2.11 Support (including Kafka). Can we expect to see 2.11 support anytime soon? The upcoming 1.4 release (now at RC2) includes support for Kafka and Scala 2.11.6. It'd be great if you could give it a try. You can find the binaries (and staging repository including 2.11 artifacts) here: https://www.mail-archive.com/dev@spark.apache.org/msg09347.html Feedback after a coupl eof days: - I am using 1.4.0-rc4 now without problems - Not used Kafka support yet - I am using this with akka-2.3.11 and akka-http 1.0-RC3 (and sbt-assembly) and this has produced a dependency nightmare. I am even adding guava manually to the assembly because I just could not get sbt-assembly to not complain. I am far from a good understanding of sbt / maven internals, but it seems that the ‘compile’ scope set in the spark POM for a lot of dependencies is somehow not honored and the libs end up causing conflicts in sbt-assembly. (I am writing this to share experience, not to complain. Thanks for the great work!!) onward... Jan iulian Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your
Re: Spark updateStateByKey fails with class leak when using case classes - resend
Interesting, only in local[*]! In the github you pointed to, what is the main that you were running. TD On Mon, May 25, 2015 at 9:23 AM, rsearle eggsea...@verizon.net wrote: Further experimentation indicates these problems only occur when master is local[*]. There are no issues if a standalone cluster is used. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-updateStateByKey-fails-with-class-leak-when-using-case-classes-resend-tp22793p23020.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to monitor Spark Streaming from Kafka?
In the receiver-less direct approach, there is no concept of consumer group as we dont use the Kafka High Level consumer (that uses ZK). Instead Spark Streaming manages offsets on its own, giving tighter guarantees. If you want to monitor the progress of the processing of offsets, you will have to update ZK yourself. With the code snippet you posted, you can get the range of offsets that were processed in each batch, and accordingly update Zookeeper using some consumer group name of your choice. TD On Mon, Jun 1, 2015 at 2:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Recommended Scala version
Can you file a JIRA with the detailed steps to reproduce the problem? On Fri, May 29, 2015 at 2:59 AM, Alex Nakos ana...@gmail.com wrote: Hi- I’ve just built the latest spark RC from source (1.4.0 RC3) and can confirm that the spark shell is still NOT working properly on 2.11. No classes in the jar I've specified with the —jars argument on the command line are available in the REPL. Cheers Alex On Thu, May 28, 2015 at 8:38 AM, Tathagata Das t...@databricks.com wrote: Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming out soon) with Scala 2.11 and report issues. TD On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers ko...@tresata.com wrote: we are still running into issues with spark-shell not working on 2.11, but we are running on somewhat older master so maybe that has been resolved already. On Tue, May 26, 2015 at 11:48 AM, Dean Wampler deanwamp...@gmail.com wrote: Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the Spark project has published maven artifacts that are compiled with 2.11 and 2.10, although the downloads at http://spark.apache.org/downloads.html are still all for 2.10. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Yes, recommended version is 2.10 as all the features are not supported by 2.11 version. Kafka libraries and JDBC components are yet to be ported to 2.11 version. And so if your project doesn't depend on these components, you can give v2.11 a try. Here's a link https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211 for building with 2.11 version. Though, you won't be running into any issues if you try v2.10 as of now. But then again, the future releases will have to shift to 2.11 version once support for v2.10 ends in the long run. On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: Dear Spark developers and users, Am I correct in believing that the recommended version of Scala to use with Spark is currently 2.10? Is there any plan to switch to 2.11 in future? Are there any advantages to using 2.11 today? Regards, Punya
Re: Recommended Scala version
Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming out soon) with Scala 2.11 and report issues. TD On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers ko...@tresata.com wrote: we are still running into issues with spark-shell not working on 2.11, but we are running on somewhat older master so maybe that has been resolved already. On Tue, May 26, 2015 at 11:48 AM, Dean Wampler deanwamp...@gmail.com wrote: Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the Spark project has published maven artifacts that are compiled with 2.11 and 2.10, although the downloads at http://spark.apache.org/downloads.html are still all for 2.10. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Yes, recommended version is 2.10 as all the features are not supported by 2.11 version. Kafka libraries and JDBC components are yet to be ported to 2.11 version. And so if your project doesn't depend on these components, you can give v2.11 a try. Here's a link https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211 for building with 2.11 version. Though, you won't be running into any issues if you try v2.10 as of now. But then again, the future releases will have to shift to 2.11 version once support for v2.10 ends in the long run. On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: Dear Spark developers and users, Am I correct in believing that the recommended version of Scala to use with Spark is currently 2.10? Is there any plan to switch to 2.11 in future? Are there any advantages to using 2.11 today? Regards, Punya
Re: Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?
You can throttle the no receiver direct Kafka stream using spark.streaming.kafka.maxRatePerPartition http://spark.apache.org/docs/latest/configuration.html#spark-streaming On Wed, May 27, 2015 at 4:34 PM, Ted Yu yuzhih...@gmail.com wrote: Have you seen http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application ? Cheers On Wed, May 27, 2015 at 4:11 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, With the no receivers approach to streaming from Kafka, is there a way to set something like spark.streaming.receiver.maxRate so as not to overwhelm the Spark consumers? What would be some of the ways to throttle the streamed messages so that the consumers don't run out of memory? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - Design considerations/Knobs
Blocks are replicated immediately, before the driver launches any jobs using them. On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote: Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch
Re: Trying to connect to many topics with several DirectConnect
Can you show us the rest of the program? When are you starting, or stopping the context. Is the exception occuring right after start or stop? What about log4j logs, what does it say? On Fri, May 22, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote: I just verified that the following code works on 1.3.0 : val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic1) val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic2) stream1.print() stream2.print() So something else is probably going on in your case. See if simply printing the two streams works for you, then compare whats different in your actual job. On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm trying to connect to two topics of Kafka with Spark with DirectStream but I get an error. I don't know if there're any limitation to do it, because when I just access to one topics everything if right. *val ssc = new StreamingContext(sparkConf, Seconds(5))* *val kafkaParams = Map[String, String](metadata.broker.list - quickstart.cloudera:9092)* *val setTopic1 = Set(topic1)* *val setTopic2 = Set(topic2)* *val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)* *val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)* The error that I get is: * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314* *15/05/22 13:12:40 ERROR OneForOneStrategy: * *java.lang.NullPointerException* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)* * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)* Are there any limitation to do it?
Re: Storing spark processed output to Database asynchronously.
Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true
Re: [Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll
Thanks for the JIRA. I will look into this issue. TD On Thu, May 21, 2015 at 1:31 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I ran into one of the issues that are potentially caused because of this and have logged a JIRA bug - https://issues.apache.org/jira/browse/SPARK-7788 Thanks, Aniket On Wed, Sep 24, 2014 at 12:59 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi all Reading through Spark streaming's custom receiver documentation, it is recommended that onStart and onStop methods should not block indefinitely. However, looking at the source code of KinesisReceiver, the onStart method calls worker.run that blocks until worker is shutdown (via a call to onStop). So, my question is what are the ramifications of making a blocking call in onStart and whether this is something that should be addressed in KinesisReceiver implementation. Thanks, Aniket
Re: Storing spark processed output to Database asynchronously.
If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true; } else HMac=Neo4jOperations.createHMac(graphDb, tuple._1); ArrayListString zipcodes=tuple._2; for(String zipcode : zipcodes){ Node Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); if(Zipcode!=null){ System.out.println(Already in Database: + zipcode); if(oldHMac==true Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) Neo4jOperations.updateToCurrentTime(HMac, Zipcode); else Neo4jOperations.travelTo(HMac, Zipcode); } else{ Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); Neo4jOperations.travelTo(HMac, Zipcode); } } } tx.success(); } graphDb.shutdown(); } }); return null; } }); The part of code in output.foreachRDD pushes the output of spark into Neo4j Database. Checking for duplicates values. This part of code is very time consuming because of which my processing time exceeds batch time. Because of that, it
Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error
Looks like somehow the file size reported by the FSInputDStream of Tachyon's FileSystem interface, is returning zero. On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to follow up this thread further . I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon , which is good. I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding .. When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure. *But on Receiver failure , Spark Streaming looses data* as I see Exception while reading the WAL file from Tachyon receivedData location for the same Receiver id which just failed. If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure . Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 (epoch 1)* INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster. INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789) INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered receiver for stream 2 from 10.252.5.62*:47255 WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)* at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IllegalArgumentException:* Seek position is past EOF: 645603894, fileSize = 0* at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) ... 15 more INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft:// 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 1] INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
Re: Connecting to an inmemory database from Spark
Doesnt seem like a Cassandra specific issue. Could you give us more information (code, errors, stack traces)? On Thu, May 21, 2015 at 1:33 PM, tshah77 tejasrs...@gmail.com wrote: TD, Do you have any example about reading from cassandra using spark streaming in java? I am trying to connect to cassandra using spark streaming and it is throwing an error as could not parse master url. Thanks Tejas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-tp1343p22979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - Design considerations/Knobs
Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch processing time. Read - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Furthemore, with checkpoint you can recover computation, but you may loose some data (that was received but not processed before driver failed) for some sources. Enabling write ahead logs and reliable source + receiver, allow zero data loss. Read - WAL in http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics - The frequency of metadata checkpoint cleaning can be controlled using spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when the RDDs in the checkpoint are no more required. Incorrect. metadata checkpointing or
Re: Spark Streaming graceful shutdown in Spark 1.4
If you are talking about handling driver crash failures, then all bets are off anyways! Adding a shutdown hook in the hope of handling driver process failure, handles only a some cases (Ctrl-C), but does not handle cases like SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its not a good idea to rely on that. Nonetheless I have opened a PR to handle the shutdown of the StreamigntContext in the same way as SparkContext. https://github.com/apache/spark/pull/6307 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed
Has this been fixed for you now? There has been a number of patches since then and it may have been fixed. On Thu, May 14, 2015 at 7:20 AM, Wangfei (X) wangf...@huawei.com wrote: Yes it is repeatedly on my locally Jenkins. 发自我的 iPhone 在 2015年5月14日,18:30,Tathagata Das t...@databricks.com 写道: Do you get this failure repeatedly? On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.com wrote: Hi, all, i got following error when i run unit test of spark by dev/run-tests on the latest branch-1.4 branch. the latest commit id: commit d518c0369fa412567855980c3f0f426cde5c190d Author: zsxwing zsxw...@gmail.com Date: Wed May 13 17:58:29 2015 -0700 error [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed: org.apache.spark.SparkException: Error communicating with MapOutputTracker [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [error] at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119) [error] at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324) [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93) [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala) [error] at org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103) [error] ... [error] Caused by: org.apache.spark.SparkException: Error sending message [message = StopMapOutputTracker] [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109) [error] ... 52 more [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] [error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) [error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) [error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) [error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) [error] at scala.concurrent.Await$.result(package.scala:107) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) [error] ... 54 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming graceful shutdown in Spark 1.4
If you wanted to stop it gracefully, then why are you not calling ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt matter whether the shutdown hook was called or not. TD On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Just figured out that if I want to perform graceful shutdown of Spark Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for Spark Core, that gets anyway called , which leads to graceful shutdown from Spark streaming failed with error like Sparkcontext already closed issue. To solve this , I need to explicitly add Utils.addShutdownHook in my driver with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and there I specified streamingcontext stop method with (false , true) parameter. Just curious to know , if this is how we need to handle shutdown hook going forward ? Can't we make the streaming shutdown default to gracefully shutdown ? Also the Java Api for adding shutdownhook in Utils looks very dirty with methods like this .. Utils.addShutdownHook(150, new Function0BoxedUnit() { @Override public BoxedUnit apply() { return null; } @Override public byte apply$mcB$sp() { return 0; } @Override public char apply$mcC$sp() { return 0; } @Override public double apply$mcD$sp() { return 0; } @Override public float apply$mcF$sp() { return 0; } @Override public int apply$mcI$sp() { // TODO Auto-generated method stub return 0; } @Override public long apply$mcJ$sp() { return 0; } @Override public short apply$mcS$sp() { return 0; } @Override public void apply$mcV$sp() { *jsc.stop(false, true);* } @Override public boolean apply$mcZ$sp() { // TODO Auto-generated method stub return false; } });
Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?
If you dont want the fileStream to start only after certain event has happened, why not start the streamingContext after that event? TD On Sun, May 17, 2015 at 7:51 PM, Haopu Wang hw...@qilinsoft.com wrote: I want to use file stream as input. And I look at SparkStreaming document again, it's saying file stream doesn't need a receiver at all. So I'm wondering if I can control a specific DStream instance. -- *From:* Evo Eftimov [mailto:evo.efti...@isecc.com] *Sent:* Monday, May 18, 2015 12:39 AM *To:* 'Akhil Das'; Haopu Wang *Cc:* 'user' *Subject:* RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application? You can make ANY *standard* receiver sleep by implementing a custom Message Deserializer class with sleep method inside it. *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Sunday, May 17, 2015 4:29 PM *To:* Haopu Wang *Cc:* user *Subject:* Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application? Why not just trigger your batch job with that event? If you really need streaming, then you can create a custom receiver and make the receiver sleep till the event has happened. That will obviously run your streaming pipelines without having any data to process. Thanks Best Regards On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote: In my application, I want to start a DStream computation only after an special event has happened (for example, I want to start the receiver only after the reference data has been properly initialized). My question is: it looks like the DStream will be started right after the StreaminContext has been started. Is it possible to delay the start of specific DStream? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
For the Spark Streaming app, if you want a particular action inside a foreachRDD to go to a particular pool, then make sure you set the pool within the foreachRDD function. E.g. dstream.foreachRDD { rdd = rdd.sparkContext.setLocalProperty(spark.scheduler.pool, pool1) // set the pool rdd.count() // or whatever job } This will ensure that the jobs will be allocated to the desired pool. LMK if this works. TD On Fri, May 15, 2015 at 11:26 AM, Richard Marscher rmarsc...@localytics.com wrote: It's not a Spark Streaming app, so sorry I'm not sure of the answer to that. I would assume it should work. On Fri, May 15, 2015 at 2:22 PM, Evo Eftimov evo.efti...@isecc.com wrote: Ok thanks a lot for clarifying that – btw was your application a Spark Streaming App – I am also looking for confirmation that FAIR scheduling is supported for Spark Streaming Apps *From:* Richard Marscher [mailto:rmarsc...@localytics.com] *Sent:* Friday, May 15, 2015 7:20 PM *To:* Evo Eftimov *Cc:* Tathagata Das; user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, May 15, 2015 6:45 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
A possible problem may be that the kinesis stream in 1.3 uses the SparkContext app name, as the Kinesis Application Name, that is used by the Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis DStreams are using the Kinesis application name (as they are in the same StreamingContext / SparkContext / Spark app name), KCL may be doing weird overwriting checkpoint information of both Kinesis streams into the same DynamoDB table. Either ways, this is going to be fixed in Spark 1.4. On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote: have you tried to union the 2 streams per the KinesisWordCountASL example https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120 where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
It would be good if you can tell what I should add to the documentation to make it easier to understand. I can update the docs for 1.4.0 release. On Tue, May 12, 2015 at 9:52 AM, Lee McFadden splee...@gmail.com wrote: Thanks for explaining Sean and Cody, this makes sense now. I'd like to help improve this documentation so other python users don't run into the same thing, so I'll look into that today. On Tue, May 12, 2015 at 9:44 AM Cody Koeninger c...@koeninger.org wrote: One of the packages just contains the streaming-kafka code. The other contains that code, plus everything it depends on. That's what assembly typically means in JVM land. Java/Scala users are accustomed to using their own build tool to include necessary dependencies. JVM dependency management is (thankfully) different from Python dependency management. As far as I can tell, there is no core issue, upstream or otherwise. On Tue, May 12, 2015 at 11:39 AM, Lee McFadden splee...@gmail.com wrote: Thanks again for all the help folks. I can confirm that simply switching to `--packages org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes everything work as intended. I'm not sure what the difference is between the two packages honestly, or why one should be used over the other, but the documentation is currently not intuitive in this matter. If you follow the instructions, initially it will seem broken. Is there any reason why the docs for Python users (or, in fact, all users - Java/Scala users will run into this too except they are armed with the ability to build their own jar with the dependencies included) should not be changed to using the assembly package by default? Additionally, after a few google searches yesterday combined with your help I'm wondering if the core issue is upstream in Kafka's dependency chain? On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote: bq. it is already in the assembly Yes. Verified: $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep yammer | grep Gauge 1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote: It doesn't depend directly on yammer metrics; Kafka does. It wouldn't be correct to declare that it does; it is already in the assembly anyway. On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote: Currently external/kafka/pom.xml doesn't cite yammer metrics as dependency. $ ls -l ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar -rw-r--r-- 1 tyu staff 82123 Dec 17 2013 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar Including the metrics-core jar would not increase the size of the final release artifact much. My two cents.
Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed
Do you get this failure repeatedly? On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.com wrote: Hi, all, i got following error when i run unit test of spark by dev/run-tests on the latest branch-1.4 branch. the latest commit id: commit d518c0369fa412567855980c3f0f426cde5c190d Author: zsxwing zsxw...@gmail.com Date: Wed May 13 17:58:29 2015 -0700 error [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed: org.apache.spark.SparkException: Error communicating with MapOutputTracker [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [error] at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119) [error] at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324) [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93) [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala) [error] at org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103) [error] ... [error] Caused by: org.apache.spark.SparkException: Error sending message [message = StopMapOutputTracker] [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109) [error] ... 52 more [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] [error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) [error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) [error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) [error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) [error] at scala.concurrent.Await$.result(package.scala:107) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) [error] ... 54 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to use rdd.countApprox
That is not supposed to happen :/ That is probably a bug. If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-time rdd.sparkContext.setJobGroup(jobGroupId) val approxCount = rdd.countApprox().getInitialValue // job launched with the set job group rdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks, Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000) val bDouble = pResult.getFinalValue() logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks, Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks, Du
Re: how to use rdd.countApprox
You might get stage information through SparkListener. But I am not sure whether you can use that information to easily kill stages. Though i highly recommend using Spark 1.3.1 (or even Spark master). Things move really fast between releases. 1.1.1 feels really old to me :P TD On Wed, May 13, 2015 at 1:25 PM, Du Li l...@yahoo-inc.com wrote: I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent jobs. My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading to 1.3.1, and then file a JIRA if it persists. Is there a way to get stage or task id of a particular transformation or action on RDD and then selectively kill the stage or tasks? It would be necessary and useful in situations similar to countApprox. Thanks, Du On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com wrote: That is not supposed to happen :/ That is probably a bug. If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-time rdd.sparkContext.setJobGroup(jobGroupId) val approxCount = rdd.countApprox().getInitialValue // job launched with the set job group rdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks, Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000) val bDouble = pResult.getFinalValue() logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks, Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks, Du
Re: DStream Union vs. StreamingContext Union
@Vadim What happened when you tried unioning using DStream.union in python? TD On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com wrote: I can confirm it does work in Java *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:53 PM *To:* Evo Eftimov *Cc:* Saisai Shao; user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. ᐧ On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:37 PM *To:* Saisai Shao *Cc:* user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
Re: DStream Union vs. StreamingContext Union
I wonder that may be a bug in the Python API. Please file it as a JIRA along with sample code to reproduce it and sample output you get. On Tue, May 12, 2015 at 10:00 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: @TD I kept getting an empty RDD (i.e. rdd.take(1) was False). ᐧ On Tue, May 12, 2015 at 12:57 PM, Tathagata Das tathagata.das1...@gmail.com wrote: @Vadim What happened when you tried unioning using DStream.union in python? TD On Tue, May 12, 2015 at 9:53 AM, Evo Eftimov evo.efti...@isecc.com wrote: I can confirm it does work in Java *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:53 PM *To:* Evo Eftimov *Cc:* Saisai Shao; user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Evo. I tried chaining Dstream unions like what you have and it didn't work for me. But passing multiple arguments to StreamingContext.union worked fine. Any idea why? I am using Python, BTW. ᐧ On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote: You can also union multiple DstreamRDDs in this way DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3) etc etc Ps: the API is not “redundant” it offers several ways for achivieving the same thing as a convenience depending on the situation *From:* Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:37 PM *To:* Saisai Shao *Cc:* user@spark.apache.org *Subject:* Re: DStream Union vs. StreamingContext Union Thanks Saisai. That makes sense. Just seems redundant to have both. ᐧ On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote: DStream.union can only union two DStream, one is itself. While StreamingContext.union can union an array of DStreams, internally DStream.union is a special case of StreamingContext.union: def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) So there's no difference, if you want to union more than two DStreams, just use the one in StreamingContext, otherwise, both two APIs are fine. 2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
Re: how to use rdd.countApprox
From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000) val bDouble = pResult.getFinalValue() logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks, Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks, Du
Re: Receiver Fault Tolerance
Incorrect. The receiver runs in an executor just like a any other tasks. In the cluster mode, the driver runs in a worker, however it launches executors in OTHER workers in the cluster. Its those executors running in other workers that run tasks, and also the receivers. On Wed, May 6, 2015 at 5:09 AM, James King jakwebin...@gmail.com wrote: In the O'reilly book Learning Spark Chapter 10 section 24/7 Operation It talks about 'Receiver Fault Tolerance' I'm unsure of what a Receiver is here, from reading it sounds like when you submit an application to the cluster in cluster mode i.e. *--deploy-mode cluster *the driver program will run on a Worker and this case this Worker is seen as a Receiver because it is consuming messages from the source. Is the above understanding correct? or is there more to it?
Re: How update counter in cassandra
This may help. http://www.slideshare.net/helenaedelson/lambda-architecture-with-spark-spark-streaming-kafka-cassandra-akka-and-scala On Wed, May 6, 2015 at 5:35 PM, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: I have a Counter family colums in Cassandra. I want update this counters with a aplication in spark Streaming. How can I update counter cassandra with Spark? Thanks.
Re: Too many open files when using Spark to consume messages from Kafka
Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
Re: Join between Streaming data vs Historical Data in spark
Have you taken a look at the join section in the streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html#stream-dataset-joins On Wed, Apr 29, 2015 at 7:11 AM, Rendy Bambang Junior rendy.b.jun...@gmail.com wrote: Let say I have transaction data and visit data visit | userId | Visit source | Timestamp | | A | google ads | 1 | | A | facebook ads | 2 | transaction | userId | total price | timestamp | | A | 100 | 248384| | B | 200 | 43298739 | I want to join transaction data and visit data to do sales attribution. I want to do it realtime whenever transaction occurs (streaming). Is it scalable to do join between one data and very big historical data using join function in spark? If it is not, then how it usually be done? Visit needs to be historical, since visit can be anytime before transaction (e.g. visit is one year before transaction occurs) Rendy
Re: implicit function in SparkStreaming
I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . -- guoqing0...@yahoo.com.hk
Re: Too many open files when using Spark to consume messages from Kafka
Also cc;ing Cody. @Cody maybe there is a reason for doing connection pooling even if there is not performance difference. TD On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300
Re: Driver memory leak?
It could be related to this. https://issues.apache.org/jira/browse/SPARK-6737 This was fixed in Spark 1.3.1. On Wed, Apr 29, 2015 at 8:38 AM, Sean Owen so...@cloudera.com wrote: Not sure what you mean. It's already in CDH since 5.4 = 1.3.0 (This isn't the place to ask about CDH) I also don't think that's the problem. The process did not run out of memory. On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com wrote: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. @Sean Will it be backported to CDH? I did't find that bug in CDH 5.4 release notes. 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: Re: implicit function in SparkStreaming
Could you put the implicit def in an object? That should work, as objects are never serialized. On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? *From:* Tathagata Das t...@databricks.com *Date:* 2015-04-30 07:00 *To:* guoqing0...@yahoo.com.hk *CC:* user user@spark.apache.org *Subject:* Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . -- guoqing0...@yahoo.com.hk
Re: Shuffle files not cleaned up (Spark 1.2.1)
What was the state of your streaming application? Was it falling behind with a large increasing scheduling delay? TD On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote: Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com wrote: Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote: We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Convert DStream to DataFrame
Did you checkout the latest streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations You also need to be aware of that to convert json RDDs to dataframe, sqlContext has to make a pass on the data to learn the schema. This will fail if a batch has no data. You have to safeguard against that. On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com wrote: What about sqlcontext.createDataframe(rdd)? On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com wrote: Hi, I am using Kafka with Apache Stream to send JSON to Apache Spark: val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) Now, I want parse the DStream created to DataFrame, but I don't know if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the message with: val lines = messages.map(_._2) Thank u for all. Sergio J.
Re: Map Question
Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim ᐧ
Re: Not able run multiple tasks in parallel, spark streaming
Furthermore, just to explain, doing arr.par.foreach does not help because it not really running anything, it only doing setup of the computation. Doing the setup in parallel does not mean that the jobs will be done concurrently. Also, from your code it seems like your pairs of dstreams dont interact with each other (that is pair1 dont interact with pair2). Then you could run then in separate applications, which would allow them to run in parallel. On Tue, Apr 21, 2015 at 11:53 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can enable this flag to run multiple jobs concurrently, It might not be production ready, but you can give it a try: sc.set(spark.streaming.concurrentJobs,2) Refer to TD's answer here http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header for more information. Thanks Best Regards On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal abhaybansal.1...@gmail.com wrote: Hi, I have use case wherein I have to join multiple kafka topics in parallel. So if there are 2n topics there is a one to one mapping of topics which needs to be joined. val arr= ... for(condition) { val dStream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics1 ).map(a=(getKey1(a._2),a._2)) val dStream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics2 ).map(a=(getKey2(a._2),a._2)) arr(counter) = (dStream1, dStream2); counter+=1; } arr.par.foreach { case(dStream1, dStream2) = try { val joined = dStream1.join(dStream2,4); joined.saveAsTextFiles(joinedData”) } catch { case t:Exception =t.printStackTrace(); } } ssc.start() ssc.awaitTermination() Doing so the streams are getting joined by sequentially. Is there a way out of this? I am new to spark, would appreciate any suggestions around this. Thanks, -Abhay
Re: Map Question
Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
It could very well be that your executor memory is not enough to store the state RDDs AND operate on the data. 1G per executor is quite low. Definitely give more memory. And have you tried increasing the number of partitions (specify number of partitions in updateStateByKey) ? On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra sourav.chan...@livestream.com wrote: Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error(* * sERROR: Got new count $currentCount 0, value:$values, state:$state, resetting to 0* *)* *currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 previousCount == 0) None* * else Some(previousCount, currentCount)* *}* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* *case (_, _: ConcurrentViewers) = * * ConcurrentViewers(b.count)* *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = * * ConcurrentViewers(a.count + b.count)* *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* *at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at
Re: Map Question
Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Vaguely makes sense. :) Wow that's an interesting corner case. On Wed, Apr 22, 2015 at 1:57 PM, Jean-Pascal Billaud j...@tellapart.com wrote: I have now a fair understanding of the situation after looking at javap output. So as a reminder: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } Basically the serialization failed because the ClassTag[K] came from the enclosing class, in which the dstream.map() code is running e.g. : class A[K : ClassTag](val dstream: DStream[K]) { [...] def fun = dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) } therefore the instance of class A is being serialized and it fails when the dstream field call writeObject() when it checks for the graph field... The fact that graph is not set might be expected given that I have not started the context yet... Cheers, On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das t...@databricks.com wrote: It is kind of unexpected, i can imagine a real scenario under which it should trigger. But obviously I am missing something :) TD On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Sure. But in general, I am assuming this Graph is unexpectedly null when DStream is being serialized must mean something. Under which circumstances, such an exception would trigger? On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote: Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$. ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext. clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
It is kind of unexpected, i can imagine a real scenario under which it should trigger. But obviously I am missing something :) TD On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Sure. But in general, I am assuming this Graph is unexpectedly null when DStream is being serialized must mean something. Under which circumstances, such an exception would trigger? On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote: Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext. clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: 985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized
Yeah, I am not sure what is going on. The only way to figure to take a look at the disassembled bytecodes using javap. TD On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com wrote: At this point I am assuming that nobody has an idea... I am still going to give it a last shot just in case it was missed by some people :) Thanks, On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, so I start the context at the very end when all the piping is done. BTW a foreachRDD will be called on the resulting dstream.map() right after that. The puzzling thing is why removing the context bounds solve the problem... What does this exception mean in general? On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com wrote: When are you getting this exception? After starting the context? TD On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hi, I am getting this serialization exception and I am not too sure what Graph is unexpectedly null when DStream is being serialized means? 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Task not serializable) Exception in thread Driver org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean( ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream. scala:438) [...] Caused by: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. at org.apache.spark.streaming.dstream.DStream$anonfun$ writeObject$1.apply$mcV$sp(DStream.scala:420) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala: 985) at org.apache.spark.streaming.dstream.DStream.writeObject( DStream.scala:403) The operation comes down to something like this: dstream.map(tuple = { val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) (tuple._1, (tuple._2, w)) }) And StreamState being a very simple standalone object: object StreamState { def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: K) : Option[V] = None } However if I remove the context bounds from K in fetch e.g. removing ClassTag and Ordering then everything is fine. If anyone has some pointers, I'd really appreciate it. Thanks,
Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers
Responses inline. On Mon, Apr 20, 2015 at 3:27 PM, Ankit Patel patel7...@hotmail.com wrote: What you said is correct and I am expecting the printlns to be in my console or my SparkUI. I do not see it in either places. Can you actually login into the machine running the executor which runs the receiver? And then see the executors logs in that machine, to see whether the expected onStart logs are there? However, if you run the program then the printlns do print for the constructor of the receiver and the for the foreach statements with total count 0. That's expected. In all cases, with or without master, the receiver object is constructed in the driver. With master, the receiver gets serialized and sent to worker machine and rest of the prints go to the executor logs. When you run it in regular more with no master attached then you will see the counts being printed out in the console as well. Please compile my program and try it out, I have spent significant time on debugging where it can go wrong and could not find an answer. As I said, checkout the executor logs in the worker machine running the receiver. You can identify that machine from the streaming tab in the Spark UI. I also see the starting receiver logs from spark when no master is defined, but do not see it when there is. Also, I am running some other simple code with spark-submit with printlns and I do see them in my SparkUI, but not for spark streaming. That's expected. Same reason. Without master, received logs is in the same driver process. With master, they go to executor logs. As such, its not clear what you are trying to debug? If you just want to see printlns, then this seems to be the expected behavior, nothing is wrong. Use without master for convenient debugging (you can see all logs and prints), and then run into distributed mode for actual deployment (where its harder to see those logs and prints). Thanks, Ankit -- From: t...@databricks.com Date: Mon, 20 Apr 2015 13:29:31 -0700 Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver attached to master with multiple workers To: patel7...@hotmail.com CC: ak...@sigmoidanalytics.com; user@spark.apache.org Well, the receiver always runs as part of an executor. When running locally (that is, spark-submit without --master), the executor is in the same process as the driver, so you see the printlns. If you are running with --master spark://cluster, then the executors ar running in different process and possibly different nodes. Hence you dont see the printlns in the output of driver process. If you see the the output of executorsin the Spark UI, then you may find those prints. TD On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel patel7...@hotmail.com wrote: The code I've written is simple as it just invokes a thread and calls a store method on the Receiver class. I see this code with printlns working fine when I try spark-submit --jars jar --class test.TestCustomReceiver jar However it does not work with I try the same command above with --master spark://masterURL spark-submit --master spark://masterURL --jars jar --class test.TestCustomReceiver jar I also tried setting the master in the conf that I am created, but that does not work either. I do not see the onStart println being printed when I use --master option. Please advice. Also, the master I am attaching to has multiple workers across hosts with many threads available to it. The code is pasted below (Classes: TestReviever, TestCustomReceiver): package test; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; public class TestReceiver extends ReceiverString { public TestReceiver() { super(StorageLevel.MEMORY_ONLY()); System.out.println(Ankit: Created TestReceiver); } @Override public void onStart() { System.out.println(Start TestReceiver); new TestThread().start(); } public void onStop() {} @SuppressWarnings(unused) private class TestThread extends Thread{ @Override public void run() { while(true){ try{ sleep( (long) (Math.random() * 3000)); }catch(Exception e){ e.printStackTrace(); } store(Time: + System.currentTimeMillis()); } } } } package test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import
Re: RAM management during cogroup and join
Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RAM management during cogroup and join
Agreed. On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote: That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, April 15, 2015 9:25 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RAM management during cogroup and join
Well, DStream joins are nothing but RDD joins at its core. However, there are more optimizations that you using DataFrames and Spark SQL joins. With the schema, there is a greater scope for optimizing the joins. So converting RDDs from streaming and the batch RDDs to data frames, and then applying joins may improve performance. TD On Wed, Apr 15, 2015 at 1:50 PM, Evo Eftimov evo.efti...@isecc.com wrote: Thank you Sir, and one final confirmation/clarification - are all forms of joins in the Spark API for DStream RDDs based on cogroup in terms of their internal implementation *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, April 15, 2015 9:48 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: RAM management during cogroup and join Agreed. On Wed, Apr 15, 2015 at 1:29 PM, Evo Eftimov evo.efti...@isecc.com wrote: That has been done Sir and represents further optimizations – the objective here was to confirm whether cogroup always results in the previously described “greedy” explosion of the number of elements included and RAM allocated for the result RDD The optimizations mentioned still don’t change the total number of elements included in the result RDD and RAM allocated – right? *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Wednesday, April 15, 2015 9:25 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: RAM management during cogroup and join Significant optimizations can be made by doing the joining/cogroup in a smart way. If you have to join streaming RDDs with the same batch RDD, then you can first partition the batch RDDs using a partitions and cache it, and then use the same partitioner on the streaming RDDs. That would make sure that the large batch RDDs is not partitioned repeatedly for the cogroup, only the small streaming RDDs are partitioned. HTH TD On Wed, Apr 15, 2015 at 1:11 PM, Evo Eftimov evo.efti...@isecc.com wrote: There are indications that joins in Spark are implemented with / based on the cogroup function/primitive/transform. So let me focus first on cogroup - it returns a result which is RDD consisting of essentially ALL elements of the cogrouped RDDs. Said in another way - for every key in each of the cogrouped RDDs there is at least one element from at least one of the cogrouped RDDs. That would mean that when smaller, moreover streaming e.g. JavaPairDstreamRDDs keep getting joined with much larger, batch RDD that would result in RAM allocated for multiple instances of the result (cogrouped) RDD a.k.a essentially the large batch RDD and some more ... Obviously the RAM will get returned when the DStream RDDs get discard and they do on a regular basis, but still that seems as unnecessary spike in the RAM consumption I have two questions: 1.Is there anyway to control the cogroup process more precisely e.g. tell it to include I the cogrouped RDD only elements where there are at least one element from EACH of the cogrouped RDDs per given key. Based on the current cogroup API this is not possible 2.If the cogroup is really such a sledgehammer and secondly the joins are based on cogroup then even though they can present a prettier picture in terms of the end result visible to the end user does that mean that under the hood there is still the same atrocious RAM consumption going on -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RAM-management-during-cogroup-and-join-tp22505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Can you clarify more on what you want to do after querying? Is the batch not completed until the querying and subsequent processing has completed? On Tue, Apr 14, 2015 at 10:36 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Thank you Tathagata, very helpful answer. Though, I would like to highlight that recent stream processing systems are trying to help users in implementing use case of holding such large (like 2 months of data) states. I would mention here Samza state management http://samza.apache.org/learn/documentation/0.9/container/state-management.html and Trident state management https://storm.apache.org/documentation/Trident-state. I'm waiting when Spark would help with that too, because generally I definitely prefer this technology:) But considering holding state in Cassandra with Spark Streaming, I understand we're not talking here about using Cassandra as input nor output (nor make use of spark-cassandra-connector https://github.com/datastax/spark-cassandra-connector). We're talking here about querying Cassandra from map/mapPartition functions. I have one question about it: Is it possible to query Cassandra asynchronously within Spark Streaming? And while doing it, is it possible to take next batch of rows, while the previous is waiting on Cassandra I/O? I think (but I'm not sure) this generally asks, whether several consecutive windows can interleave (because they are long to process)? Let's draw it: --|query Cassandra asynchronously--- window1 --- window2 While writing it, I start to believe they can, because windows are time-triggered, not triggered when previous window has finished... But it's better to ask:) 2015-04-15 2:08 GMT+02:00 Tathagata Das t...@databricks.com: Fundamentally, stream processing systems are designed for processing streams of data, not for storing large volumes of data for a long period of time. So if you have to maintain that much state for months, then its best to use another system that is designed for long term storage (like Cassandra) which has proper support for making all that state fault-tolerant, high-performant, etc. So yes, the best option is to use Cassandra for the state and Spark Streaming jobs accessing the state from Cassandra. There are a number of optimizations that can be done. Its not too hard to build a simple on-demand populated cache (singleton hash map for example), that speeds up access from Cassandra, and all updates are written through the cache. This is a common use of Spark Streaming + Cassandra/HBase. Regarding the performance of updateStateByKey, we are aware of the limitations, and we will improve it soon :) TD On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even *2 months*. During that time there might be around *500 milion*of such events, and *millions of unique* users, which are keys here. *My questions are:* - Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? - Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: - using Cassandra for state - using Trident state (with Cassandra probably) - using Samza with its state management.
Re: How to do dispatching in Streaming?
It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?
Fundamentally, stream processing systems are designed for processing streams of data, not for storing large volumes of data for a long period of time. So if you have to maintain that much state for months, then its best to use another system that is designed for long term storage (like Cassandra) which has proper support for making all that state fault-tolerant, high-performant, etc. So yes, the best option is to use Cassandra for the state and Spark Streaming jobs accessing the state from Cassandra. There are a number of optimizations that can be done. Its not too hard to build a simple on-demand populated cache (singleton hash map for example), that speeds up access from Cassandra, and all updates are written through the cache. This is a common use of Spark Streaming + Cassandra/HBase. Regarding the performance of updateStateByKey, we are aware of the limitations, and we will improve it soon :) TD On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hey guys, could you please help me with a question I asked on Stackoverflow: https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-millions-of-keys-in-state-of-spark-streaming-job-for-two ? I'll be really grateful for your help! I'm also pasting the question below: I'm trying to solve a (simplified here) problem in Spark Streaming: Let's say I have a log of events made by users, where each event is a tuple (user name, activity, time), e.g.: (user1, view, 2015-04-14T21:04Z) (user1, click, 2015-04-14T21:05Z) Now I would like to gather events by user to do some analysis of that. Let's say that output is some analysis of: (user1, List((view, 2015-04-14T21:04Z),(click, 2015-04-14T21:05Z)) The events should be kept for even *2 months*. During that time there might be around *500 milion*of such events, and *millions of unique* users, which are keys here. *My questions are:* - Is it feasible to do such a thing with updateStateByKey on DStream, when I have millions of keys stored? - Am I right that DStream.window is no use here, when I have 2 months length window and would like to have a slide of few seconds? P.S. I found out, that updateStateByKey is called on all the keys on every slide, so that means it will be called millions of time every few seconds. That makes me doubt in this design and I'm rather thinking about alternative solutions like: - using Cassandra for state - using Trident state (with Cassandra probably) - using Samza with its state management.
Re: Seeing message about receiver not being de-registered on invoking Streaming context stop
What version of Spark are you using? There was a known bug which could be causing this. It got fixed in Spark 1.3 TD On Mon, Apr 13, 2015 at 11:44 PM, Akhil Das ak...@sigmoidanalytics.com wrote: When you say done fetching documents, does it mean that you are stopping the streamingContext? (ssc.stop) or you meant completed fetching documents for a batch? If possible, you could paste your custom receiver code so that we can have a look at it. Thanks Best Regards On Tue, Apr 7, 2015 at 8:46 AM, Hari Polisetty hpoli...@icloud.com wrote: My application is running Spark in local mode and I have a Spark Streaming Listener as well as a Custom Receiver. When the receiver is done fetching all documents, it invokes “stop” on itself. I see the StreamingListener getting a callback on “onReceiverStopped” where I stop the streaming context. However, I see the following message in my logs: 2015-04-06 16:41:51,193 WARN [Thread-66] com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop - Stopped receiver 2015-04-06 16:41:51,193 ERROR [sparkDriver-akka.actor.default-dispatcher-17] org.apache.spark.Logging$class.logError - Deregistered receiver for stream 0: AlHURLEY 2015-04-06 16:41:51,202 WARN [Executor task launch worker-2] org.apache.spark.Logging$class.logWarning - Stopped executor without error 2015-04-06 16:41:51,203 WARN [StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the receivers have not deregistered, Map(0 - ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY)) What am I missing or doing wrong?
Re: sbt-assembly spark-streaming-kinesis-asl error
Have you tried marking only spark-streaming-kinesis-asl as not provided, and the rest as provided? Then you will not even need to add kinesis-asl.jar in the spark-submit. TD On Tue, Apr 14, 2015 at 2:27 PM, Mike Trienis mike.trie...@orcsol.com wrote: Richard, You response was very helpful and actually resolved my issue. In case others run into a similar issue, I followed the procedure: - Upgraded to spark 1.3.0 - Add all spark related libraries are provided - Include spark transitive library dependencies where my build.sbt file libraryDependencies ++= { Seq( org.apache.spark %% spark-core % 1.3.0 % provided, org.apache.spark %% spark-streaming % 1.3.0 % provided, org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided, joda-time % joda-time % 2.2, org.joda % joda-convert % 1.2, com.amazonaws % aws-java-sdk % 1.8.3, com.amazonaws % amazon-kinesis-client % 1.2.0) and submitting a spark job can done via sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar Thanks again Richard! Cheers Mike. On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher rmarsc...@localytics.com wrote: Hi, I've gotten an application working with sbt-assembly and spark, thought I'd present an option. In my experience, trying to bundle any of the Spark libraries in your uber jar is going to be a major pain. There will be a lot of deduplication to work through and even if you resolve them it can be easy to do it incorrectly. I considered it an intractable problem. So the alternative is to not include those jars in your uber jar. For this to work you will need the same libraries on the classpath of your Spark cluster and your driver program (if you are running that as an application and not just using spark-submit). As for your NoClassDefFoundError, you either are missing Joda Time in your runtime classpath or have conflicting versions. It looks like something related to AWS wants to use it. Check your uber jar to see if its including the org/joda/time as well as the classpath of your spark cluster. For example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark 1.2 I found a conflict between httpclient versions that my uber jar pulled in for AWS libraries and the one bundled in the spark uber jar. I hand patched the spark uber jar to remove the offending httpclient bytecode to resolve the issue. You may be facing a similar situation. I hope that gives some ideas for resolving your issue. Regards, Rich On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi Vadim, After removing provided from org.apache.spark %% spark-streaming-kinesis-asl I ended up with huge number of deduplicate errors: https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a It would be nice if you could share some pieces of your mergeStrategy code for reference. Also, after adding provided back to spark-streaming-kinesis-asl and I submit the spark job with the spark-streaming-kinesis-asl jar file sh /usr/lib/spark/bin/spark-submit --verbose --jars lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar I still end up with the following error... Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) Has anyone else run into this issue? On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I don't believe the Kinesis asl should be provided. I used mergeStrategy successfully to produce an uber jar. Fyi, I've been having trouble consuming data out of Kinesis with Spark with no success :( Would be curious to know if you got it working. Vadim On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have having trouble building a fat jar file through sbt-assembly. [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename' [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
Re: not found: value SQLContextSingleton
Have you created a class called SQLContextSingleton ? If so, is it in the compile class path? On Fri, Apr 10, 2015 at 6:47 AM, Mukund Ranjan (muranjan) muran...@cisco.com wrote: Hi All, Any idea why I am getting this error? wordsTenSeconds.foreachRDD((rdd: RDD[String], time: Time) = { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) // This line is creating this error }) — E R R O R —— [error] /Users/muranjan/workspace/kafka/src/main/scala/kafka/KafkaConsumer.scala:103: not found: value SQLContextSingleton [error] val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) [error]^ [error] /Users/muranjan/workspace/kafka/src/main/scala/kafka/KafkaConsumer.scala:158: not found: value SQLContextSingleton [error] val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) [error]^ [error] two errors found [error] (compile:compileIncremental) Compilation failed — E R R O R —— Thanks, Mukund
Re: coalesce(*, false) problem
Coalesce tries to reduce the number of partitions into smaller number of partitions, without moving the data around (as much as possible). Since most of received data is in a few machines (those running receivers), coallesce just makes bigger merged partitions in those. Without coalesce Machine 1: 10 partitions processing in parallel Machine 2: 2 partitions processing in parallel With coalesce Machine 1: 10 partitions merged into 1 partition processed together taking 10 times longer Machine 2: 2 partitions merged into 1 partition process together taking 2 times longer Hope this clarifies. TD On Fri, Apr 10, 2015 at 5:16 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote: Hi All, We are running a spark streaming application. The data source is kafka, the data partition of kafka is not well-distributed http://dict.cn/well-distributed but every receiver on every executor can receive data, just different of the amount. and our data is very large so we try to a local repartition with coalesce(*.false). but we found an odd appearances。 Most of the task running on one executor. See picture one. When we remove the coalesce call the task can distributed http://dict.cn/well-distributed better see picture two. Any one knows why? *Picture one* *Picture two* 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作! This communication is intended only for the addressee(s) and may contain information that is privileged and confidential. You are hereby notified that, if you are not an intended recipient listed above, or an authorized employee or agent of an addressee of this communication responsible for delivering e-mail messages to an intended recipient, any dissemination, distribution or reproduction of this communication (including any attachments hereto) is strictly prohibited. If you have received this communication in error, please notify us immediately by a reply e-mail addressed to the sender and permanently delete the original e-mail communication and any attachments from all storage devices without making or otherwise retaining a copy.
Re: Lookup / Access of master data in spark streaming
Responses inline. Hope they help. On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani aassud...@impetus.com wrote: Hi Friends, I am trying to solve a use case in spark streaming, I need help on getting to right approach on lookup / update the master data. Use case ( simplified ) I’ve a dataset of entity with three attributes and identifier/row key in a persistent store. Each attribute along with row key come from a different stream let’s say, effectively 3 source streams. Now whenever any attribute comes up, I want to update/sync the persistent store and do some processing, but the processing would require the latest state of entity with latest values of three attributes. I wish if I have the all the entities cached in some sort of centralized cache ( like we have data in hdfs ) within spark streaming which may be used for data local processing. But I assume there is no such thing. potential approaches I m thinking of, I suspect first two are not feasible, but I want to confirm, 1. Is Broadcast Variables mutable ? If yes, can I use it as cache for all entities sizing around 100s of GBs provided i have a cluster with enough RAM. Broadcast variables are not mutable. But you can always create a new broadcast variable when you want and use the latest broadcast variable in your computation. dstream.transform { rdd = val latestBroacast = getLatestBroadcastVariable() // fetch existing or update+create new and return val transformedRDD = rdd. .. // use latestBroacast in RDD tranformations transformedRDD } Since the transform RDD-to-RDD function runs on the driver every batch interval, it will always use the latest broadcast variable that you want. Though note that whenever you create a new broadcast, the next batch may take a little longer to as the data needs to be actually broadcasted out. That can also be made asynchronous by running a simple task (to force the broadcasting out) on any new broadcast variable in a different thread as Spark Streaming batch schedule, but using the same underlying Spark Context. 1. Is there any kind of sticky partition possible, so that I route my stream data to go through the same node where I've the corresponding entities, subset of entire store, cached in memory within JVM / off heap on the node, this would avoid lookups from store. You could use updateStateByKey. That is quite sticky, but does not eliminate the possibility that it can run on a different node. In fact this is necessary for fault-tolerance - what if the node it was supposed to run goes down? The task will be run on a different node, and you have to design your application such that it can handle that. 1. If I stream the entities from persistent store into engine, this becomes 4th stream - the entity stream, how do i use join / merge to enable stream 1,2,3 to lookup and update the data from stream 4. Would DStream.join work for few seconds worth of data in attribute streams with all data in entity stream ? Or do I use transform and within that use rdd join, I’ve doubts if I am leaning towards core spark approach in spark streaming ? Depends on what kind of join! If you want the join every batch in stream with a static data set (or rarely updated dataset), the transform+join is the way to go. If you want to join one stream with a window of data from another stream, then DStream.join is the way to go. 1. 1. The last approach, which i think will surely work but i want to avoid, is i keep the entities in IMDB and do lookup/update calls on from stream 1,2 and 3. Any help is deeply appreciated as this would help me design my system efficiently and the solution approach may become a beacon for lookup master data sort of problems. Regards, Amit -- NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference.
Re: Could not compute split, block not found in Spark Streaming Simple Application
Are you running # of receivers = # machines? TD On Thu, Apr 9, 2015 at 9:56 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Sorry, I was getting those errors because my workload was not sustainable. However, I noticed that, by just running the spark-streaming-benchmark ( https://github.com/tdas/spark-streaming-benchmark/blob/master/Benchmark.scala ), I get no difference on the execution time, number of processed records, and delay whether I'm using 1 machine or 2 machines with the setup described before (using spark standalone). Is it normal? On Fri, Mar 27, 2015 at 5:32 PM, Tathagata Das t...@databricks.com wrote: If it is deterministically reproducible, could you generate full DEBUG level logs, from the driver and the workers and give it to me? Basically I want to trace through what is happening to the block that is not being found. And can you tell what Cluster manager are you using? Spark Standalone, Mesos or YARN? On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am just running this simple example with machineA: 1 master + 1 worker machineB: 1 worker « val ssc = new StreamingContext(sparkConf, Duration(1000)) val rawStreams = (1 to numStreams).map(_ =ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.filter(line = Random.nextInt(1) == 0).map(line = { var sum = BigInt(0) line.toCharArray.foreach(chr = sum += chr.toInt) fib2(sum) sum }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result: $s).print() » And I'm getting the following exceptions: Log from machineB « 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134) 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast variable 24 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140) 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with curMem=47117, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1886.0 B, free 267.2 MB) 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block broadcast_24_piece0 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 took 19 ms 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with curMem=49003, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 3.0 KB, free 267.2 MB) 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID 140) java.lang.Exception: Could not compute split, block input-0-1427473262420 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID 138) java.lang.Exception: Could not compute split, block input-0-1427473262418 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51
Re: Continuous WARN messages from BlockManager about block replication
Well, you are running in local mode, so it cannot find another peer to replicate the blocks received from receivers. That's it. Its not a real concern and that error will go away when you are run it in a cluster. On Thu, Apr 9, 2015 at 11:24 AM, Nandan Tammineedi nan...@defend7.com wrote: Hi, I'm running a spark streaming job in local mode (--master local[4]), and I'm seeing tons of these messages, roughly once every second - WARN BlockManager: Block input-0-1428527584600 replicated to only 0 peer(s) instead of 1 peers We're using spark 1.2.1. Even with TRACE logging enabled, we're not seeing any log messages indicating failure to replicate the blocks. Should we be concerned about this warning (and if so, how should we debug this), or is this a corner case in local mode where replication is not attempted, but the warning is emitted anyway? If so, what is the workaround? thanks Nandan
Re: Timeout errors from Akka in Spark 1.2.1
There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) ... 17 more There was a similar query posted here http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html but did not find any resolution to that issue. Thanks in advance, NB
Re: Timeout errors from Akka in Spark 1.2.1
Yes, in local mode they the driver and executor will be same the process. And in that case the Java options in SparkConf configuration will not work. On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote: Since we are running in local mode, won't all the executors be in the same JVM as the driver? Thanks NB On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote: Its does take effect on the executors, not on the driver. Which is okay because executors have all the data and therefore have GC issues, not so usually for the driver. If you want to double-sure, print the JVM flag (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags) However, the GC i was referring to that initiates the RDD and shuffle cleanup was the GC on the driver. Thought I would clarify. TD On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote: Hi TD, Thanks for the response. Since you mentioned GC, this got me thinking. Given that we are running in local mode (all in a single JVM) for now, does the option spark.executor.extraJavaOptions set to -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before we use it to create the StreamingContext? I ask because that is what we are doing right now. If not, perhaps we have not been running with the Concurrent Mark Sweep at all and is that recommended instead of forcing GC periodically? Thanks NB On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com wrote: There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) at org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640
Re: Empty RDD?
Aah yes. The jsonRDD method needs to walk through the whole RDD to understand the schema, and does not work if there is not data in it. Making sure there is no data in it using take(1) should work. TD
Re: Empty RDD?
What is the computation you are doing in the foreachRDD, that is throwing the exception? One way to guard against is to do a take(1) to see if you get back any data. If there is none, then don't do anything with the RDD. TD On Wed, Apr 8, 2015 at 1:08 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: When I call *transform* or *foreachRDD *on* DStream*, I keep getting an error that I have an empty RDD, which make sense since my batch interval maybe smaller than the rate at which new data are coming in. How to guard against it? Thanks, Vadim ᐧ
Re: Spark + Kinesis
, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val unionStreams = ssc.union(kinesisStreams).map(byteArray = new String(byteArray))unionStreams.print()ssc.start() ssc.awaitTermination() }}* On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.htmlsi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9 and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scalasi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9 . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0
Re: WordCount example
There are no workers registered with the Spark Standalone master! That is the crux of the problem. :) Follow the instructions properly - https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts Especially make the conf/slaves file has intended workers listed. TD On Mon, Apr 6, 2015 at 9:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Interesting, I see 0 cores in the UI? - *Cores:* 0 Total, 0 Used On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das t...@databricks.com wrote: What does the Spark Standalone UI at port 8080 say about number of cores? On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com wrote: [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process processor : 0 processor : 1 processor : 2 processor : 3 processor : 4 processor : 5 processor : 6 processor : 7 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote: How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
So you want to sort based on the total count of the all the records received through receiver? In that case, you have to combine all the counts using updateStateByKey ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala ) But stepping back, if you want to get the final results at the end of the receiving all the data (as opposed to continuously), why are you even using streaming? You could create a custom RDD that reads from ElasticSearch and then use it in a Spark program. I think that's more natural as your application is more batch-like than streaming-like as you are using the results in real-time. TD On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty hpoli...@icloud.com wrote: I have created a Custom Receiver to fetch records pertaining to a specific query from Elastic Search and have implemented Streaming RDD transformations to process the data generated by the receiver. The final RDD is a sorted list of name value pairs and I want to read the top 20 results programmatically rather than write to an external file. I use foreach on the RDD and take the top 20 values into a list. I see that forEach is processed every time there is a new microbatch from the receiver. However, I want the foreach computation to be done only once when the receiver has finished fetching all the records from Elastic Search and before the streaming context is killed so that I can populate the results into a list and process it in my driver program. Appreciate any guidance in this regard. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Application Stages and DAG
What he meant is that look it up in the Spark UI, specifically in the Stage tab to see what is taking so long. And yes code snippet helps us debug. TD On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You need open the Stage\'s page which is taking time, and see how long its spending on GC etc. Also it will be good to post that Stage and its previous transformation's code snippet to make us understand it better. Thanks Best Regards On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri vijay.innam...@gmail.com wrote: When I run the Spark application (streaming) in local mode I could see the execution progress as below.. [Stage 0: (1817 + 1) / 3125] [Stage 2:=== (740 + 1) / 3125] One of the stages is taking long time for execution. How to find the transformations/ actions associated with a particular stage? Is there anyway to find the execution DAG of a Spark Application? Regards Vijay
Re: About Waiting batches on the spark streaming UI
Very good question! This is because the current code is written such that the ui considers a batch as waiting only when it has actually started being processed. Thats batched waiting in the job queue is not considered in the calculation. It is arguable that it may be more intuitive to count that in the waiting as well. On Apr 3, 2015 12:59 AM, bit1...@163.com bit1...@163.com wrote: I copied the following from the spark streaming UI, I don't know why the Waiting batches is 1, my understanding is that it should be 72. Following is my understanding: 1. Total time is 1minute 35 seconds=95 seconds 2. Batch interval is 1 second, so, 95 batches are generated in 95 seconds. 3. Processed batches are 23(Correct, because in my processing code, it does nothing but sleep 4 seconds) 4. Then the waiting batches should be 95-23=72 - *Started at: * Fri Apr 03 15:17:47 CST 2015 - *Time since start: *1 minute 35 seconds - *Network receivers: *1 - *Batch interval: *1 second - *Processed batches: *23 - *Waiting batches: *1 - *Received records: *0 - *Processed records: *0 -- bit1...@163.com
Re: About Waiting batches on the spark streaming UI
Maybe that should be marked as waiting as well. Will keep that in mind. We plan to update the ui soon, so will keep that in mind. On Apr 3, 2015 10:12 AM, Ted Yu yuzhih...@gmail.com wrote: Maybe add another stat for batches waiting in the job queue ? Cheers On Fri, Apr 3, 2015 at 10:01 AM, Tathagata Das t...@databricks.com wrote: Very good question! This is because the current code is written such that the ui considers a batch as waiting only when it has actually started being processed. Thats batched waiting in the job queue is not considered in the calculation. It is arguable that it may be more intuitive to count that in the waiting as well. On Apr 3, 2015 12:59 AM, bit1...@163.com bit1...@163.com wrote: I copied the following from the spark streaming UI, I don't know why the Waiting batches is 1, my understanding is that it should be 72. Following is my understanding: 1. Total time is 1minute 35 seconds=95 seconds 2. Batch interval is 1 second, so, 95 batches are generated in 95 seconds. 3. Processed batches are 23(Correct, because in my processing code, it does nothing but sleep 4 seconds) 4. Then the waiting batches should be 95-23=72 - *Started at: * Fri Apr 03 15:17:47 CST 2015 - *Time since start: *1 minute 35 seconds - *Network receivers: *1 - *Batch interval: *1 second - *Processed batches: *23 - *Waiting batches: *1 - *Received records: *0 - *Processed records: *0 -- bit1...@163.com
Re: WordCount example
What does the Spark Standalone UI at port 8080 say about number of cores? On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com wrote: [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process processor : 0 processor : 1 processor : 2 processor : 3 processor : 4 processor : 5 processor : 6 processor : 7 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote: How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started 15