[
https://issues.apache.org/jira/browse/SPARK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14543325#comment-14543325
]
Sean Owen commented on SPARK-7624:
----------------------------------
[~davies] thoughts on the impact of those few lines?
> Task scheduler delay is increasing time over time in spark local mode
> ---------------------------------------------------------------------
>
> Key: SPARK-7624
> URL: https://issues.apache.org/jira/browse/SPARK-7624
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.3.1
> Reporter: Jack Hu
> Labels: delay, schedule
>
> I am running a simple spark streaming program with spark 1.3.1 in local mode,
> it receives json string from a socket with rate 50 events per second, it can
> run well in first 6 hours (although the minor gc count per minute is
> increasing all the time), after that, i can see that the scheduler delay in
> every task is significant increased from 10 ms to 100 ms, after 10 hours
> running, the task delay is about 800 ms and cpu is also increased from 2% to
> 30%. This causes the steaming job can not finish in one batch interval (5
> seconds). I dumped the java memory after 16 hours and can see there are about
> 200000 {{org.apache.spark.scheduler.local.ReviveOffers}} objects in
> {{akka.actor.LightArrayRevolverScheduler$TaskQueue[]}}. Then i checked the
> code and see only one place may put the {{ReviveOffers}} to akka
> {{LightArrayRevolverScheduler}}: the {{LocalActor::reviveOffers}}
> {code}
> def reviveOffers() {
> val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname,
> freeCores))
> val tasks = scheduler.resourceOffers(offers).flatten
> for (task <- tasks) {
> freeCores -= scheduler.CPUS_PER_TASK
> executor.launchTask(executorBackend, taskId = task.taskId,
> attemptNumber = task.attemptNumber,
> task.name, task.serializedTask)
> }
> if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
> // Try to reviveOffer after 1 second, because scheduler may wait for
> locality timeout
> context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
> }
> }
> {code}
> I removed the last three lines in this method (the whole {{if}} block, which
> is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it
> worked smooth after 20 hours running, the scheduler delay is about 10 ms all
> the time. So there should have some conditions that the ReviveOffers will be
> duplicate scheduled? I am not sure why this happens, but i feel that this is
> the root cause of this issue.
> My spark settings:
> # Memor: 3G
> # CPU: 8 cores
> # Streaming Batch interval: 5 seconds.
> Here are my streaming code:
> {code}
> val input = ssc.socketTextStream(
> hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
> /// parse the json to Order
> Order(_), preservePartitioning = true)
> val mresult = input.map(
> v => (v.customer, UserSpending(v.customer, v.count * v.price,
> v.timestamp.toLong))).cache()
> val tempr = mresult.window(
> Seconds(firstStageWindowSize),
> Seconds(firstStageWindowSize)
> ).transform(
> rdd => rdd.union(rdd).union(rdd).union(rdd)
> )
> tempr.count.print
> tempr.cache().foreachRDD((rdd, t) => {
> for (i <- 1 to 5) {
> val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
> println("""T: """ + t + """: """ + c)
> }
> })
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]