Kousuke Saruta created SPARK-6769:
-------------------------------------
Summary: Usage of the ListenerBus in YarnClusterSuite is wrong
Key: SPARK-6769
URL: https://issues.apache.org/jira/browse/SPARK-6769
Project: Spark
Issue Type: Bug
Components: Spark Core, Tests, YARN
Affects Versions: 1.4.0
Reporter: Kousuke Saruta
Priority: Minor
In YarnClusterSuite, a test case uses `SaveExecutorInfo` to handle
ExecutorAddedEvent as follows.
{code}
private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
...
listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf()
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and
$dollarSigns"))
sc.addSparkListener(listener)
val status = new File(args(0))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
sc.stop()
Files.write(result, status, UTF_8)
}
{code}
But, the usage is wrong because Executors will spawn during initializing
SparkContext and SparkContext#addSparkListener should be invoked after the
initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle
ExecutorAddedEvent.
Following code refers the result of the handling ExecutorAddedEvent. Because of
the reason above, we cannot reach the assertion.
{code}
// verify log urls are present
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]