Updated Branches: refs/heads/branch-0.9 fbfbb331d -> 2859cab2f
Merge pull request #435 from tdas/filestream-fix Fixed the flaky tests by making SparkConf not serializable SparkConf was being serialized with CoGroupedRDD and Aggregator, which somehow caused OptionalJavaException while being deserialized as part of a ShuffleMapTask. SparkConf should not even be serializable (according to conversation with Matei). This change fixes that. @mateiz @pwendell (cherry picked from commit 139c24ef08e6ffb090975c9808a2cba304eb79e0) Signed-off-by: Patrick Wendell <pwend...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2859cab2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2859cab2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2859cab2 Branch: refs/heads/branch-0.9 Commit: 2859cab2f50099d1a691aecb5f7e5dfa26dccdb1 Parents: fbfbb33 Author: Patrick Wendell <pwend...@gmail.com> Authored: Tue Jan 14 23:07:55 2014 -0800 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Tue Jan 14 23:08:19 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/Aggregator.scala | 3 +-- .../main/scala/org/apache/spark/SparkConf.scala | 3 ++- .../org/apache/spark/rdd/CoGroupedRDD.scala | 3 +-- .../flume/src/test/resources/log4j.properties | 2 +- .../kafka/src/test/resources/log4j.properties | 2 +- .../streaming/kafka/KafkaStreamSuite.scala | 1 + .../mqtt/src/test/resources/log4j.properties | 2 +- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 1 + .../twitter/src/test/resources/log4j.properties | 2 +- .../streaming/twitter/TwitterStreamSuite.scala | 1 + .../zeromq/src/test/resources/log4j.properties | 2 +- .../streaming/zeromq/ZeroMQStreamSuite.scala | 1 + .../org/apache/spark/streaming/Checkpoint.scala | 10 ++++--- .../apache/spark/streaming/DStreamGraph.scala | 2 ++ .../dstream/DStreamCheckpointData.scala | 26 +++++++++++++++++- .../spark/streaming/CheckpointSuite.scala | 28 ++++++++++++++------ 16 files changed, 66 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/core/src/main/scala/org/apache/spark/Aggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index edbea6e..c4579cf 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -33,8 +33,7 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) + private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 93d3d1f..369c6ce 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import com.typesafe.config.ConfigFactory +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -41,7 +42,7 @@ import com.typesafe.config.ConfigFactory * * @param loadDefaults whether to load values from the system properties and classpath */ -class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging { +class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 9c6b308..f2feb40 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -66,7 +66,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] - private val sparkConf = SparkEnv.get.conf private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -106,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - + val sparkConf = SparkEnv.get.conf val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/flume/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties index 063529a..d1bd73a 100644 --- a/external/flume/src/test/resources/log4j.properties +++ b/external/flume/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/flume/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties index 063529a..38910d1 100644 --- a/external/kafka/src/test/resources/log4j.properties +++ b/external/kafka/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/kafka/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 9c81f23..d9809f6 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -35,5 +35,6 @@ class KafkaStreamSuite extends TestSuiteBase { ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data + ssc.stop() } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/mqtt/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties index 063529a..d0462c7 100644 --- a/external/mqtt/src/test/resources/log4j.properties +++ b/external/mqtt/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/mqtt/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 73e7ce6..89c40ad 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -32,5 +32,6 @@ class MQTTStreamSuite extends TestSuiteBase { val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data + ssc.stop() } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/twitter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties index 063529a..c918335 100644 --- a/external/twitter/src/test/resources/log4j.properties +++ b/external/twitter/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/twitter/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index ccc3878..06ab0cd 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -39,5 +39,6 @@ class TwitterStreamSuite extends TestSuiteBase { // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream + ssc.stop() } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/zeromq/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties index 063529a..304683d 100644 --- a/external/zeromq/src/test/resources/log4j.properties +++ b/external/zeromq/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=external/zeromq/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 4193b8a..92d55a7 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -40,5 +40,6 @@ class ZeroMQStreamSuite extends TestSuiteBase { StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving + ssc.stop() } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5046a1d..4d778dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) - val sparkConf = ssc.conf + val sparkConfPairs = ssc.conf.getAll - // These should be unset when a checkpoint is deserialized, - // otherwise the SparkContext won't initialize correctly. - sparkConf.remove("spark.driver.host").remove("spark.driver.port") + def sparkConf = { + new SparkConf(false).setAll(sparkConfPairs) + .remove("spark.driver.host") + .remove("spark.driver.port") + } def validate() { assert(master != null, "Checkpoint.master is null") http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 8faa79f..0683113 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true + logDebug("Enabled checkpoint mode") oos.defaultWriteObject() checkpointInProgress = false + logDebug("Disabled checkpoint mode") } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 38bad5a..906a16e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{ObjectInputStream, IOException} +import java.io.{ObjectOutputStream, ObjectInputStream, IOException} import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging @@ -118,7 +118,31 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (dstream.context.graph != null) { + dstream.context.graph.synchronized { + if (dstream.context.graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException( + "Graph is unexpectedly null when DStream is being serialized.") + } + } + + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() timeToOldestCheckpointFileTime = new HashMap[Time, Time] timeToCheckpointFile = new HashMap[Time, String] http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2859cab2/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 89daf47..831e7c1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase { val value = "myvalue" System.setProperty(key, value) ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + val cp = new Checkpoint(ssc, Time(1000)) - assert(!cp.sparkConf.contains("spark.driver.host")) - assert(!cp.sparkConf.contains("spark.driver.port")) - assert(!cp.sparkConf.contains("spark.hostPort")) - assert(cp.sparkConf.get(key) === value) + val cpConf = cp.sparkConf + assert(cpConf.get("spark.master") === originalConf.get("spark.master")) + assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(cpConf.get(key) === value) ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(!newCp.sparkConf.contains("spark.driver.host")) - assert(!newCp.sparkConf.contains("spark.driver.port")) - assert(!newCp.sparkConf.contains("spark.hostPort")) - assert(newCp.sparkConf.get(key) === value) + + val newCpConf = newCp.sparkConf + assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(newCpConf.get(key) === value) + assert(!newCpConf.contains("spark.driver.host")) + assert(!newCpConf.contains("spark.driver.port")) + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(restoredConf.get(key) === value) }