Repository: spark
Updated Branches:
  refs/heads/master 5011f264f -> 4106d80fb


[SPARK-12122][STREAMING] Prevent batches from being submitted twice after 
recovering StreamingContext from checkpoint

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #10127 from tdas/SPARK-12122.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4106d80f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4106d80f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4106d80f

Branch: refs/heads/master
Commit: 4106d80fb6a16713a6cd2f15ab9d60f2527d9be5
Parents: 5011f26
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Dec 4 01:42:29 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Dec 4 01:42:29 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4106d80f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 2de035d..8dfdc1f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     logInfo("Batches pending processing (" + pendingTimes.size + " batches): " 
+
       pendingTimes.mkString(", "))
     // Reschedule jobs for these times
-    val timesToReschedule = (pendingTimes ++ 
downTimes).distinct.sorted(Time.ordering)
+    val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < 
restartTime }
+      .distinct.sorted(Time.ordering)
     logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " 
+
       timesToReschedule.mkString(", "))
     timesToReschedule.foreach { time =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to