Repository: spark Updated Branches: refs/heads/master e26db9be4 -> c928796ad
[SPARK-6331] Load new master URL if present when recovering streaming context from checkpoint In streaming driver recovery, when the SparkConf is reconstructed based on the checkpointed configuration, it recovers the old master URL. This okay if the cluster on which the streaming application is relaunched is the same cluster as it was running before. But if that cluster changes, there is no way to inject the new master URL of the new cluster. As a result, the restarted app tries to connect to the non-existent old cluster and fails. The solution is to check whether a master URL is set in the System properties (by Spark submit) before recreating the SparkConf. If a new master url is set in the properties, then use it as that is obviously the most relevant one. Otherwise load the old one (to maintain existing behavior). Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #5024 from tdas/SPARK-6331 and squashes the following commits: 392fd44 [Tathagata Das] Fixed naming issue. c7c0b99 [Tathagata Das] Addressed comments. 6a0857c [Tathagata Das] Updated testsuites. 222485d [Tathagata Das] Load new master URL if present when recovering streaming context from checkpoint Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c928796a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c928796a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c928796a Branch: refs/heads/master Commit: c928796ade54f68e26bc55734a9867a046d2e3fe Parents: e26db9be Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Mar 17 05:31:27 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Mar 17 05:31:27 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/streaming/Checkpoint.scala | 7 +++++-- .../spark/streaming/StreamingContext.scala | 2 +- .../spark/streaming/CheckpointSuite.scala | 21 +++++++++++++++++--- .../spark/streaming/StreamingContextSuite.scala | 2 +- 4 files changed, 25 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index f88a8a0..cb4c94f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,10 +43,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll - def sparkConf = { - new SparkConf(false).setAll(sparkConfPairs) + def createSparkConf(): SparkConf = { + val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.port") + val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master") + newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) } + newSparkConf } def validate() { http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b5b6770..543224d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -116,7 +116,7 @@ class StreamingContext private[streaming] ( private[streaming] val sc: SparkContext = { if (isCheckpointPresent) { - new SparkContext(cp_.sparkConf) + new SparkContext(cp_.createSparkConf()) } else { sc_ } http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 03c448f..8ea91ec 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -146,7 +146,7 @@ class CheckpointSuite extends TestSuiteBase { // This tests whether spark conf persists through checkpoints, and certain // configs gets scrubbed - test("persistence of conf through checkpoints") { + test("recovery of conf through checkpoints") { val key = "spark.mykey" val value = "myvalue" System.setProperty(key, value) @@ -154,7 +154,7 @@ class CheckpointSuite extends TestSuiteBase { val originalConf = ssc.conf val cp = new Checkpoint(ssc, Time(1000)) - val cpConf = cp.sparkConf + val cpConf = cp.createSparkConf() assert(cpConf.get("spark.master") === originalConf.get("spark.master")) assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) assert(cpConf.get(key) === value) @@ -163,7 +163,8 @@ class CheckpointSuite extends TestSuiteBase { // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - val newCpConf = newCp.sparkConf + // Verify new SparkConf has all the previous properties + val newCpConf = newCp.createSparkConf() assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) assert(newCpConf.get(key) === value) @@ -174,6 +175,20 @@ class CheckpointSuite extends TestSuiteBase { ssc = new StreamingContext(null, newCp, null) val restoredConf = ssc.conf assert(restoredConf.get(key) === value) + ssc.stop() + + // Verify new SparkConf picks up new master url if it is set in the properties. See SPARK-6331. + try { + val newMaster = "local[100]" + System.setProperty("spark.master", newMaster) + val newCpConf = newCp.createSparkConf() + assert(newCpConf.get("spark.master") === newMaster) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + ssc = new StreamingContext(null, newCp, null) + assert(ssc.sparkContext.master === newMaster) + } finally { + System.clearProperty("spark.master") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c928796a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 6a7cd97..2e5005e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -100,7 +100,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10) + assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10) ssc = new StreamingContext(null, newCp, null) assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org