This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc88d3df [SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules
jobs which never complete
fc88d3df is described below
commit fc88d3df5c53e0ade04af8ad9006ed543950c467
Author: Ajith <[email protected]>
AuthorDate: Sun Mar 17 12:56:41 2019 -0500
[SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules jobs which
never complete
## What changes were proposed in this pull request?
When Result stage has zero tasks, the Job End event is never fired, hence
the Job is always running in UI. Example: sc.emptyRDD[Int].countApprox(1000)
never finishes even it has no tasks to launch
## How was this patch tested?
Added UT
Closes #24100 from ajithme/emptyRDD.
Authored-by: Ajith <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../scala/org/apache/spark/scheduler/DAGScheduler.scala | 11 +++++++++--
.../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 13 +++++++++++++
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dd1b259..9177c1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -767,10 +767,17 @@ private[spark] class DAGScheduler(
callSite: CallSite,
timeout: Long,
properties: Properties): PartialResult[R] = {
- val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
- val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.length).toArray
val jobId = nextJobId.getAndIncrement()
+ if (partitions.isEmpty) {
+ // Return immediately if the job is running 0 tasks
+ val time = clock.getTimeMillis()
+ listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](),
properties))
+ listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
+ return new PartialResult(evaluator.currentResult(), true)
+ }
+ val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
+ val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions, callSite, listener,
SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e17d264..e74f462 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.annotation.meta.param
@@ -2849,6 +2850,18 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with TimeLi
}
}
+ test("SPARK-27164: RDD.countApprox on empty RDDs schedules jobs which never
complete") {
+ val latch = new CountDownLatch(1)
+ val jobListener = new SparkListener {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ latch.countDown()
+ }
+ }
+ sc.addSparkListener(jobListener)
+ sc.emptyRDD[Int].countApprox(10000).getFinalValue()
+ assert(latch.await(10, TimeUnit.SECONDS))
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its
preferred locations.
* Note that this checks only the host and not the executor ID.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]