[ https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-7624: ----------------------------------- Assignee: Davies Liu (was: Apache Spark) > Task scheduler delay is increasing time over time in spark local mode > --------------------------------------------------------------------- > > Key: SPARK-7624 > URL: https://issues.apache.org/jira/browse/SPARK-7624 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.3.1 > Reporter: Jack Hu > Assignee: Davies Liu > Labels: delay, schedule > > I am running a simple spark streaming program with spark 1.3.1 in local mode, > it receives json string from a socket with rate 50 events per second, it can > run well in first 6 hours (although the minor gc count per minute is > increasing all the time), after that, i can see that the scheduler delay in > every task is significant increased from 10 ms to 100 ms, after 10 hours > running, the task delay is about 800 ms and cpu is also increased from 2% to > 30%. This causes the steaming job can not finish in one batch interval (5 > seconds). I dumped the java memory after 16 hours and can see there are about > 200000 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in > {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the > code and see only one place may put the {{ReviveOffers}} to akka > {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}} > {code} > def reviveOffers() { > val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, > freeCores)) > val tasks = scheduler.resourceOffers(offers).flatten > for (task <- tasks) { > freeCores -= scheduler.CPUS_PER_TASK > executor.launchTask(executorBackend, taskId = task.taskId, > attemptNumber = task.attemptNumber, > task.name, task.serializedTask) > } > if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { > // Try to reviveOffer after 1 second, because scheduler may wait for > locality timeout > context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers) > } > } > {code} > I removed the last three lines in this method (the whole {{if}} block, which > is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it > worked smooth after 20 hours running, the scheduler delay is about 10 ms all > the time. So there should have some conditions that the ReviveOffers will be > duplicate scheduled? I am not sure why this happens, but i feel that this is > the root cause of this issue. > My spark settings: > # Memor: 3G > # CPU: 8 cores > # Streaming Batch interval: 5 seconds. > Here are my streaming code: > {code} > val input = ssc.socketTextStream( > hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions( > /// parse the json to Order > Order(_), preservePartitioning = true) > val mresult = input.map( > v => (v.customer, UserSpending(v.customer, v.count * v.price, > v.timestamp.toLong))).cache() > val tempr = mresult.window( > Seconds(firstStageWindowSize), > Seconds(firstStageWindowSize) > ).transform( > rdd => rdd.union(rdd).union(rdd).union(rdd) > ) > tempr.count.print > tempr.cache().foreachRDD((rdd, t) => { > for (i <- 1 to 5) { > val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count() > println("""T: """ + t + """: """ + c) > } > }) > {code} > ======================================================== > Updated at 2015-05-15 > I did print some detail schedule times of the suspect lines in > {{LocalActor::reviveOffers}}: {color:red}*1685343501*{color} times after 18 > hours running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org