Updated Branches:
  refs/heads/master 6dbd2208f -> b5346064d

Serialize and restore spark.cleaner.ttl to savepoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fbe40c58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fbe40c58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fbe40c58

Branch: refs/heads/master
Commit: fbe40c5806a01e38c56b12a09bc4b84681a99602
Parents: a106ed8
Author: Vadim Chekan <kot.bege...@gmail.com>
Authored: Fri Sep 20 12:13:48 2013 -0700
Committer: Vadim Chekan <kot.bege...@gmail.com>
Committed: Fri Sep 20 12:13:48 2013 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala   | 2 ++
 .../main/scala/org/apache/spark/streaming/StreamingContext.scala | 4 ++++
 2 files changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fbe40c58/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2d8f072..bb9feba 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.Logging
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.MetadataCleaner
 
 
 private[streaming]
@@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+  val delaySeconds = MetadataCleaner.getDelaySeconds
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fbe40c58/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c..098081d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -100,6 +100,10 @@ class StreamingContext private (
       "both SparkContext and checkpoint as null")
   }
 
+  if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 
0) {
+    MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
+  }
+
   if (MetadataCleaner.getDelaySeconds < 0) {
     throw new SparkException("Spark Streaming cannot be used without setting 
spark.cleaner.ttl; "
       + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS 
for the shell)")

Reply via email to