Repository: spark Updated Branches: refs/heads/master fec641b84 -> a45d5480f
[SPARK-2464][Streaming] Fixed Twitter stream stopping bug Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, which in turn causes an Exception to be thrown to the listener. This exception caused the Receiver to be restarted. This patch check whether the receiver was stopped or not, and accordingly restarts on exception. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #1577 from tdas/twitter-stop and squashes the following commits: 011b525 [Tathagata Das] Fixed Twitter stream stopping bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a45d5480 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a45d5480 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a45d5480 Branch: refs/heads/master Commit: a45d5480f65d2e969fc7fbd8f358b1717fb99bef Parents: fec641b Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Thu Jul 24 15:59:09 2014 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Thu Jul 24 15:59:09 2014 -0700 ---------------------------------------------------------------------- .../spark/streaming/twitter/TwitterInputDStream.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a45d5480/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 5ea2e55..4eacc47 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -63,7 +63,8 @@ class TwitterReceiver( storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { - private var twitterStream: TwitterStream = _ + @volatile private var twitterStream: TwitterStream = _ + @volatile private var stopped = false def onStart() { try { @@ -78,7 +79,9 @@ class TwitterReceiver( def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} def onException(e: Exception) { - restart("Error receiving tweets", e) + if (!stopped) { + restart("Error receiving tweets", e) + } } }) @@ -91,12 +94,14 @@ class TwitterReceiver( } setTwitterStream(newTwitterStream) logInfo("Twitter receiver started") + stopped = false } catch { case e: Exception => restart("Error starting Twitter stream", e) } } def onStop() { + stopped = true setTwitterStream(null) logInfo("Twitter receiver stopped") }