git commit: [SPARK-2464][Streaming] Fixed Twitter stream stopping bug
Repository: spark Updated Branches: refs/heads/master fec641b84 - a45d5480f [SPARK-2464][Streaming] Fixed Twitter stream stopping bug Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, which in turn causes an Exception to be thrown to the listener. This exception caused the Receiver to be restarted. This patch check whether the receiver was stopped or not, and accordingly restarts on exception. Author: Tathagata Das tathagata.das1...@gmail.com Closes #1577 from tdas/twitter-stop and squashes the following commits: 011b525 [Tathagata Das] Fixed Twitter stream stopping bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a45d5480 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a45d5480 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a45d5480 Branch: refs/heads/master Commit: a45d5480f65d2e969fc7fbd8f358b1717fb99bef Parents: fec641b Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Jul 24 15:59:09 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Jul 24 15:59:09 2014 -0700 -- .../spark/streaming/twitter/TwitterInputDStream.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a45d5480/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala -- diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 5ea2e55..4eacc47 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -63,7 +63,8 @@ class TwitterReceiver( storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { - private var twitterStream: TwitterStream = _ + @volatile private var twitterStream: TwitterStream = _ + @volatile private var stopped = false def onStart() { try { @@ -78,7 +79,9 @@ class TwitterReceiver( def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} def onException(e: Exception) { - restart(Error receiving tweets, e) + if (!stopped) { +restart(Error receiving tweets, e) + } } }) @@ -91,12 +94,14 @@ class TwitterReceiver( } setTwitterStream(newTwitterStream) logInfo(Twitter receiver started) + stopped = false } catch { case e: Exception = restart(Error starting Twitter stream, e) } } def onStop() { +stopped = true setTwitterStream(null) logInfo(Twitter receiver stopped) }
git commit: [SPARK-2464][Streaming] Fixed Twitter stream stopping bug
Repository: spark Updated Branches: refs/heads/branch-1.0 91241592a - 53b4e0f95 [SPARK-2464][Streaming] Fixed Twitter stream stopping bug Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, which in turn causes an Exception to be thrown to the listener. This exception caused the Receiver to be restarted. This patch check whether the receiver was stopped or not, and accordingly restarts on exception. Author: Tathagata Das tathagata.das1...@gmail.com Closes #1577 from tdas/twitter-stop and squashes the following commits: 011b525 [Tathagata Das] Fixed Twitter stream stopping bug. (cherry picked from commit a45d5480f65d2e969fc7fbd8f358b1717fb99bef) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53b4e0f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53b4e0f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53b4e0f9 Branch: refs/heads/branch-1.0 Commit: 53b4e0f95750f371db14f8da4b5c4a1c38301710 Parents: 9124159 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Jul 24 15:59:09 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Jul 24 16:00:49 2014 -0700 -- .../spark/streaming/twitter/TwitterInputDStream.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53b4e0f9/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala -- diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 5ea2e55..4eacc47 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -63,7 +63,8 @@ class TwitterReceiver( storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { - private var twitterStream: TwitterStream = _ + @volatile private var twitterStream: TwitterStream = _ + @volatile private var stopped = false def onStart() { try { @@ -78,7 +79,9 @@ class TwitterReceiver( def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} def onException(e: Exception) { - restart(Error receiving tweets, e) + if (!stopped) { +restart(Error receiving tweets, e) + } } }) @@ -91,12 +94,14 @@ class TwitterReceiver( } setTwitterStream(newTwitterStream) logInfo(Twitter receiver started) + stopped = false } catch { case e: Exception = restart(Error starting Twitter stream, e) } } def onStop() { +stopped = true setTwitterStream(null) logInfo(Twitter receiver stopped) }