Repository: spark Updated Branches: refs/heads/master bff65b5cc -> 5a5526164
SPARK-5425: Use synchronised methods in system properties to create SparkConf SPARK-5425: Fixed usages of system properties This patch fixes few problems caused by the fact that the Scala wrapper over system properties is not thread-safe and is basically invalid because it doesn't take into account the default values which could have been set in the properties object. The problem is fixed by modifying `Utils.getSystemProperties` method so that it uses `stringPropertyNames` method of the `Properties` class, which is thread-safe (internally it creates a defensive copy in a synchronized method) and returns keys of the properties which were set explicitly and which are defined as defaults. The other related problem, which is fixed here. was in `ResetSystemProperties` mix-in. It created a copy of the system properties in the wrong way. This patch also introduces a test case for thread-safeness of SparkConf creation. Refer to the discussion in https://github.com/apache/spark/pull/4220 for more details. Author: Jacek Lewandowski <[email protected]> Closes #4222 from jacek-lewandowski/SPARK-5425-1.3 and squashes the following commits: 03da61b [Jacek Lewandowski] SPARK-5425: Modified Utils.getSystemProperties to return a map of all system properties - explicit + defaults 8faf2ea [Jacek Lewandowski] SPARK-5425: Use SerializationUtils to save properties in ResetSystemProperties trait 71aa572 [Jacek Lewandowski] SPARK-5425: Use synchronised methods in system properties to create SparkConf Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a552616 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a552616 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a552616 Branch: refs/heads/master Commit: 5a5526164bdf9ecf1306d4570e816eb4df5cfd2b Parents: bff65b5 Author: Jacek Lewandowski <[email protected]> Authored: Mon Feb 2 14:07:19 2015 -0800 Committer: Josh Rosen <[email protected]> Committed: Mon Feb 2 14:07:19 2015 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkConf.scala | 5 ++-- .../scala/org/apache/spark/util/Utils.scala | 11 ++++++--- .../scala/org/apache/spark/SparkConfSuite.scala | 25 ++++++++++++++++++++ .../spark/util/ResetSystemProperties.scala | 7 +++++- .../spark/examples/DriverSubmissionTest.scala | 4 +++- 5 files changed, 45 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index cd91c8f..4d4c69d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.Utils /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -53,8 +54,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { if (loadDefaults) { // Load any spark.* system properties - for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { - set(k, v) + for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { + set(key, value) } } http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 703b23a..31850b5 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1347,9 +1347,14 @@ private[spark] object Utils extends Logging { hashAbs } - /** Returns a copy of the system properties that is thread-safe to iterator over. */ - def getSystemProperties(): Map[String, String] = { - System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String] + /** Returns the system properties map that is thread-safe to iterator over. It gets the + * properties which have been set explicitly, as well as those for which only a default value + * has been defined. */ + def getSystemProperties: Map[String, String] = { + val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield + (key, System.getProperty(key)) + + sysProps.toMap } /** http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 790976a..e08210a 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark +import java.util.concurrent.{TimeUnit, Executors} + +import scala.util.{Try, Random} + import org.scalatest.FunSuite import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} import org.apache.spark.util.ResetSystemProperties @@ -123,6 +127,27 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(conf.get("spark.test.a.b.c") === "a.b.c") } + test("Thread safeness - SPARK-5425") { + import scala.collection.JavaConversions._ + val executor = Executors.newSingleThreadScheduledExecutor() + val sf = executor.scheduleAtFixedRate(new Runnable { + override def run(): Unit = + System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString) + }, 0, 1, TimeUnit.MILLISECONDS) + + try { + val t0 = System.currentTimeMillis() + while ((System.currentTimeMillis() - t0) < 1000) { + val conf = Try(new SparkConf(loadDefaults = true)) + assert(conf.isSuccess === true) + } + } finally { + executor.shutdownNow() + for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425.")) + System.getProperties.remove(key) + } + } + test("register kryo classes through registerKryoClasses") { val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala index d4b92f3..bad1aa9 100644 --- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala +++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.util.Properties +import org.apache.commons.lang3.SerializationUtils import org.scalatest.{BeforeAndAfterEach, Suite} /** @@ -42,7 +43,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su var oldProperties: Properties = null override def beforeEach(): Unit = { - oldProperties = new Properties(System.getProperties) + // we need SerializationUtils.clone instead of `new Properties(System.getProperties()` because + // the later way of creating a copy does not copy the properties but it initializes a new + // Properties object with the given properties as defaults. They are not recognized at all + // by standard Scala wrapper over Java Properties then. + oldProperties = SerializationUtils.clone(System.getProperties) super.beforeEach() } http://git-wip-us.apache.org/repos/asf/spark/blob/5a552616/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 65251e9..e757283 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.examples import scala.collection.JavaConversions._ +import org.apache.spark.util.Utils + /** Prints out environmental information, sleeps, and then exits. Made to * test driver submission in the standalone scheduler. */ object DriverSubmissionTest { @@ -30,7 +32,7 @@ object DriverSubmissionTest { val numSecondsToSleep = args(0).toInt val env = System.getenv() - val properties = System.getProperties() + val properties = Utils.getSystemProperties println("Environment variables containing SPARK_TEST:") env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
