Repository: spark
Updated Branches:
  refs/heads/branch-1.0 104590c91 -> bf47559c5


[streaming][SPARK-1578] Removed requirement for TTL in StreamingContext.

Since shuffles and RDDs that are out of context are automatically cleaned by 
Spark core (using ContextCleaner) there is no need for setting the cleaner TTL 
while creating a StreamingContext.

Author: Tathagata Das <[email protected]>

Closes #491 from tdas/ttl-fix and squashes the following commits:

cf01dc7 [Tathagata Das] Removed requirement for TTL in StreamingContext.
(cherry picked from commit f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf47559c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf47559c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf47559c

Branch: refs/heads/branch-1.0
Commit: bf47559c5d4c3c702b31a9cbd07206a5881161a0
Parents: 104590c
Author: Tathagata Das <[email protected]>
Authored: Tue Apr 22 19:35:13 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Tue Apr 22 19:35:22 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 15 +------
 .../spark/streaming/InputStreamsSuite.scala     |  2 +-
 .../spark/streaming/StreamingContextSuite.scala | 45 ++++++--------------
 .../apache/spark/streaming/TestSuiteBase.scala  |  1 -
 4 files changed, 14 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf47559c/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6d9dc87..9ba6e02 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,11 +116,6 @@ class StreamingContext private[streaming] (
     }
   }
 
-  if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
-    throw new SparkException("Spark Streaming cannot be used without setting 
spark.cleaner.ttl; "
-      + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS 
for the shell)")
-  }
-
   private[streaming] val conf = sc.conf
 
   private[streaming] val env = SparkEnv.get
@@ -500,8 +495,6 @@ class StreamingContext private[streaming] (
 
 object StreamingContext extends Logging {
 
-  private[streaming] val DEFAULT_CLEANER_TTL = 3600
-
   implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: 
DStream[(K,V)]) = {
     new PairDStreamFunctions[K, V](stream)
   }
@@ -546,13 +539,7 @@ object StreamingContext extends Logging {
   def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls)
 
   private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext 
= {
-    // Set the default cleaner delay to an hour if not already set.
-    // This should be sufficient for even 1 second batch intervals.
-    if (MetadataCleaner.getDelaySeconds(conf) < 0) {
-      MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL)
-    }
-    val sc = new SparkContext(conf)
-    sc
+    new SparkContext(conf)
   }
 
   private[streaming] def createNewSparkContext(

http://git-wip-us.apache.org/repos/asf/spark/blob/bf47559c/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 46b7f63..3bad871 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
   }
 
-  // TODO: This test makes assumptions about Thread.sleep() and is flaky
+  // TODO: This test works in IntelliJ but not through SBT
   ignore("actor input stream") {
     // Start the server
     val testServer = new TestServer()

http://git-wip-us.apache.org/repos/asf/spark/blob/bf47559c/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 6d14b1f..3e2b25a 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
   val batchDuration = Milliseconds(500)
   val sparkHome = "someDir"
   val envPair = "key" -> "value"
-  val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100
 
   var sc: SparkContext = null
   var ssc: StreamingContext = null
 
-  before {
-    System.clearProperty("spark.cleaner.ttl")
-  }
-
   after {
     if (ssc != null) {
       ssc.stop()
@@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     ssc = new StreamingContext(master, appName, batchDuration)
     assert(ssc.sparkContext.conf.get("spark.master") === master)
     assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
-    assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
   }
 
   test("from no conf + spark home") {
     ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
     assert(ssc.conf.get("spark.home") === sparkHome)
-    assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
   }
 
   test("from no conf + spark home + env") {
     ssc = new StreamingContext(master, appName, batchDuration,
       sparkHome, Nil, Map(envPair))
     assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
-    assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
-  }
-
-  test("from conf without ttl set") {
-    val myConf = SparkContext.updatedConf(new SparkConf(false), master, 
appName)
-    ssc = new StreamingContext(myConf, batchDuration)
-    assert(MetadataCleaner.getDelaySeconds(ssc.conf) ===
-      StreamingContext.DEFAULT_CLEANER_TTL)
   }
 
-  test("from conf with ttl set") {
+  test("from conf with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, 
appName)
-    myConf.set("spark.cleaner.ttl", ttl.toString)
+    myConf.set("spark.cleaner.ttl", "10")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
   }
 
-  test("from existing SparkContext without ttl set") {
+  test("from existing SparkContext") {
     sc = new SparkContext(master, appName)
-    val exception = intercept[SparkException] {
-      ssc = new StreamingContext(sc, batchDuration)
-    }
-    assert(exception.getMessage.contains("ttl"))
+    ssc = new StreamingContext(sc, batchDuration)
   }
 
-  test("from existing SparkContext with ttl set") {
+  test("from existing SparkContext with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, 
appName)
-    myConf.set("spark.cleaner.ttl", ttl.toString)
+    myConf.set("spark.cleaner.ttl", "10")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
   }
 
   test("from checkpoint") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, 
appName)
-    myConf.set("spark.cleaner.ttl", ttl.toString)
+    myConf.set("spark.cleaner.ttl", "10")
     val ssc1 = new StreamingContext(myConf, batchDuration)
     addInputStream(ssc1).register
     ssc1.start()
     val cp = new Checkpoint(ssc1, Time(1000))
-    assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl)
+    assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === 
"10")
     ssc1.stop()
     val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
-    assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl)
+    assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
     ssc = new StreamingContext(null, newCp, null)
-    assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl)
+    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
   }
 
   test("start and stop state check") {

http://git-wip-us.apache.org/repos/asf/spark/blob/bf47559c/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index aa2d5c2..4f63fd3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter 
with Logging {
   val conf = new SparkConf()
     .setMaster(master)
     .setAppName(framework)
-    .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString)
 
   // Default before function for any streaming test suite. Override this
   // if you want to add your stuff to "before" (i.e., don't call before { } )

Reply via email to