Which version of Spark Streaming are you using. When the batch processing time increases to 15-20 seconds, could you compare the task times compared to the tasks time when the application is just launched? Basically is the increase from 6 seconds to 15-20 seconds is caused by increase in computation or because of GC's etc.
On Tue, Dec 30, 2014 at 1:41 PM, RK <prk...@yahoo.com.invalid> wrote: > Here is the code for my streaming job. > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > val sparkConf = new SparkConf().setAppName("SparkStreamingJob") > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > sparkConf.set("spark.default.parallelism", "100") > sparkConf.set("spark.shuffle.consolidateFiles", "true") > sparkConf.set("spark.speculation", "true") > sparkConf.set("spark.speculation.interval", "5000") > sparkConf.set("spark.speculation.quantile", "0.9") > sparkConf.set("spark.speculation.multiplier", "3") > sparkConf.set("spark.mesos.coarse", "true") > sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc > -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC") > sparkConf.set("spark.shuffle.manager", "SORT") > > val ssc = new StreamingContext(sparkConf, Seconds(10)) > ssc.checkpoint(checkpointDir) > > val topics = "trace" > val numThreads = 1 > val topicMap = topics.split(",").map((_,numThreads)).toMap > > val kafkaPartitions = 20 > val kafkaDStreams = (1 to kafkaPartitions).map { _ => > KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) > } > > val lines = ssc.union(kafkaDStreams) > val words = lines.map(line => doSomething_1(line)) > val filteredWords = words.filter(word => word != "test") > val groupedWords = filteredWords.map(word => (word, 1)) > > val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _, > Seconds(30), Seconds(10)) > val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) => > count > 50} > val finalResult = windowedWordsFiltered.foreachRDD(words => > doSomething_2(words)) > > ssc.start() > ssc.awaitTermination() > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > > I am running this job on a 9 slave AWS EC2 cluster with each slave node has > 32 vCPU & 60GB memory. > > When I start this job, the processing time is usually around 5 - 6 seconds > for the 10 seconds batch and the scheduling delay is around 0 seconds or a > few ms. However, as the job run for 6 - 8 hours, the processing time > increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6 > hours. > > When I look at the completed stages, I see that the time taken for > getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes > from around 2 seconds to more than a few minutes. > > Clicking on +details next to this stage description shows the following > execution trace. > org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088) > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294) > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288) > scala.Option.orElse(Option.scala:257) > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285) > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221) > scala.util.Try$.apply(Try.scala:161) > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221) > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) > > When I click on one of these slow stages that executed after 6 - 8 hours, I > find the following information for individual tasks inside. > - All tasks seem to execute with PROCESS_LOCAL locality. > - Quite a few of these tasks seem to spend anywhere between 30 - 80% of > their time in GC. Although, when I look at the total memory usage on each of > the slave nodes under executors information, I see that the usage is only > around 200MB out of 20GB available. > > Even after a few hours, the map stages (val groupedWords = > filteredWords.map(word => (word, 1))) seem to have consistent times as > during the start of the job which seems to indicate that this code is fine. > Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours. > > Based on the information that map is as fast as during the start of job and > that there is no waiting batches, I am assuming that the getCallSite stages > correspond to getting data out of Kafka? Is this correct or not? > If my assumption is correct, Is there anything that I could do to optimize > receiving data from Kafka? > If not, which part of my code needs to be optimized to reduce the scheduling > delay? > > Thanks, > RK > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org