Updated Branches: refs/heads/master 6dbd2208f -> b5346064d
Serialize and restore spark.cleaner.ttl to savepoint Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fbe40c58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fbe40c58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fbe40c58 Branch: refs/heads/master Commit: fbe40c5806a01e38c56b12a09bc4b84681a99602 Parents: a106ed8 Author: Vadim Chekan <kot.bege...@gmail.com> Authored: Fri Sep 20 12:13:48 2013 -0700 Committer: Vadim Chekan <kot.bege...@gmail.com> Committed: Fri Sep 20 12:13:48 2013 -0700 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 2 ++ .../main/scala/org/apache/spark/streaming/StreamingContext.scala | 4 ++++ 2 files changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fbe40c58/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2d8f072..bb9feba 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.MetadataCleaner private[streaming] @@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() + val delaySeconds = MetadataCleaner.getDelaySeconds def validate() { assert(master != null, "Checkpoint.master is null") http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fbe40c58/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c..098081d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -100,6 +100,10 @@ class StreamingContext private ( "both SparkContext and checkpoint as null") } + if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(cp_.delaySeconds) + } + if (MetadataCleaner.getDelaySeconds < 0) { throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")