Which version of Spark Streaming are you using.

When the batch processing time increases to 15-20 seconds, could you
compare the task times compared to the tasks time when the application
is just launched? Basically is the increase from 6 seconds to 15-20
seconds is caused by increase in computation or because of GC's etc.

On Tue, Dec 30, 2014 at 1:41 PM, RK <prk...@yahoo.com.invalid> wrote:
> Here is the code for my streaming job.
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> val sparkConf = new SparkConf().setAppName("SparkStreamingJob")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.default.parallelism", "100")
> sparkConf.set("spark.shuffle.consolidateFiles", "true")
> sparkConf.set("spark.speculation", "true")
> sparkConf.set("spark.speculation.interval", "5000")
> sparkConf.set("spark.speculation.quantile", "0.9")
> sparkConf.set("spark.speculation.multiplier", "3")
> sparkConf.set("spark.mesos.coarse", "true")
> sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseConcMarkSweepGC")
> sparkConf.set("spark.shuffle.manager", "SORT")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
> ssc.checkpoint(checkpointDir)
>
> val topics = "trace"
> val numThreads = 1
> val topicMap = topics.split(",").map((_,numThreads)).toMap
>
> val kafkaPartitions = 20
> val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
>   KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> }
>
> val lines = ssc.union(kafkaDStreams)
> val words = lines.map(line => doSomething_1(line))
> val filteredWords = words.filter(word => word != "test")
> val groupedWords = filteredWords.map(word => (word, 1))
>
> val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
> Seconds(30), Seconds(10))
> val windowedWordsFiltered = windowedWordCounts.filter{case (word, count) =>
> count > 50}
> val finalResult = windowedWordsFiltered.foreachRDD(words =>
> doSomething_2(words))
>
> ssc.start()
> ssc.awaitTermination()
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> I am running this job on a 9 slave AWS EC2 cluster with each slave node has
> 32 vCPU & 60GB memory.
>
> When I start this job, the processing time is usually around 5 - 6 seconds
> for the 10 seconds batch and the scheduling delay is around 0 seconds or a
> few ms. However, as the job run for 6 - 8 hours, the processing time
> increases to 15 - 20 seconds but the scheduling delay is increasing to 4 - 6
> hours.
>
> When I look at the completed stages, I see that the time taken for
> getCallSite at DStream.scala:294 keeps increasing as time passes by. It goes
> from around 2 seconds to more than a few minutes.
>
> Clicking on +details next to this stage description shows the following
> execution trace.
> org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> scala.Option.orElse(Option.scala:257)
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)
> scala.util.Try$.apply(Try.scala:161)
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>
> When I click on one of these slow stages that executed after 6 - 8 hours, I
> find the following information for individual tasks inside.
> - All tasks seem to execute with PROCESS_LOCAL locality.
> - Quite a few of these tasks seem to spend anywhere between 30 - 80% of
> their time in GC. Although, when I look at the total memory usage on each of
> the slave nodes under executors information, I see that the usage is only
> around 200MB out of 20GB available.
>
> Even after a few hours, the map stages (val groupedWords =
> filteredWords.map(word => (word, 1))) seem to have consistent times as
> during the start of the job which seems to indicate that this code is fine.
> Also, the waiting batches is either at 0 or 1 even after 8 to 10 hours.
>
> Based on the information that map is as fast as during the start of job and
> that there is no waiting batches, I am assuming that the getCallSite stages
> correspond to getting data out of Kafka? Is this correct or not?
> If my assumption is correct, Is there anything that I could do to optimize
> receiving data from Kafka?
> If not, which part of my code needs to be optimized to reduce the scheduling
> delay?
>
> Thanks,
> RK
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to