Repository: spark Updated Branches: refs/heads/master 5011f264f -> 4106d80fb
[SPARK-12122][STREAMING] Prevent batches from being submitted twice after recovering StreamingContext from checkpoint Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #10127 from tdas/SPARK-12122. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4106d80f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4106d80f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4106d80f Branch: refs/heads/master Commit: 4106d80fb6a16713a6cd2f15ab9d60f2527d9be5 Parents: 5011f26 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Fri Dec 4 01:42:29 2015 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Dec 4 01:42:29 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4106d80f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2de035d..8dfdc1f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times - val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime } + .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org