git commit: [SPARK-2464][Streaming] Fixed Twitter stream stopping bug

2014-07-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master fec641b84 - a45d5480f


[SPARK-2464][Streaming] Fixed Twitter stream stopping bug

Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, 
which in turn causes an Exception to be thrown to the listener. This exception 
caused the Receiver to be restarted. This patch check whether the receiver was 
stopped or not, and accordingly restarts on exception.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #1577 from tdas/twitter-stop and squashes the following commits:

011b525 [Tathagata Das] Fixed Twitter stream stopping bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a45d5480
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a45d5480
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a45d5480

Branch: refs/heads/master
Commit: a45d5480f65d2e969fc7fbd8f358b1717fb99bef
Parents: fec641b
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Jul 24 15:59:09 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Jul 24 15:59:09 2014 -0700

--
 .../spark/streaming/twitter/TwitterInputDStream.scala   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a45d5480/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
--
diff --git 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 5ea2e55..4eacc47 100644
--- 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,7 +63,8 @@ class TwitterReceiver(
 storageLevel: StorageLevel
   ) extends Receiver[Status](storageLevel) with Logging {
 
-  private var twitterStream: TwitterStream = _
+  @volatile private var twitterStream: TwitterStream = _
+  @volatile private var stopped = false
 
   def onStart() {
 try {
@@ -78,7 +79,9 @@ class TwitterReceiver(
 def onScrubGeo(l: Long, l1: Long) {}
 def onStallWarning(stallWarning: StallWarning) {}
 def onException(e: Exception) {
-  restart(Error receiving tweets, e)
+  if (!stopped) {
+restart(Error receiving tweets, e)
+  }
 }
   })
 
@@ -91,12 +94,14 @@ class TwitterReceiver(
   }
   setTwitterStream(newTwitterStream)
   logInfo(Twitter receiver started)
+  stopped = false
 } catch {
   case e: Exception = restart(Error starting Twitter stream, e)
 }
   }
 
   def onStop() {
+stopped = true
 setTwitterStream(null)
 logInfo(Twitter receiver stopped)
   }



git commit: [SPARK-2464][Streaming] Fixed Twitter stream stopping bug

2014-07-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 91241592a - 53b4e0f95


[SPARK-2464][Streaming] Fixed Twitter stream stopping bug

Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, 
which in turn causes an Exception to be thrown to the listener. This exception 
caused the Receiver to be restarted. This patch check whether the receiver was 
stopped or not, and accordingly restarts on exception.

Author: Tathagata Das tathagata.das1...@gmail.com

Closes #1577 from tdas/twitter-stop and squashes the following commits:

011b525 [Tathagata Das] Fixed Twitter stream stopping bug.

(cherry picked from commit a45d5480f65d2e969fc7fbd8f358b1717fb99bef)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53b4e0f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53b4e0f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53b4e0f9

Branch: refs/heads/branch-1.0
Commit: 53b4e0f95750f371db14f8da4b5c4a1c38301710
Parents: 9124159
Author: Tathagata Das tathagata.das1...@gmail.com
Authored: Thu Jul 24 15:59:09 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Thu Jul 24 16:00:49 2014 -0700

--
 .../spark/streaming/twitter/TwitterInputDStream.scala   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/53b4e0f9/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
--
diff --git 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 5ea2e55..4eacc47 100644
--- 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,7 +63,8 @@ class TwitterReceiver(
 storageLevel: StorageLevel
   ) extends Receiver[Status](storageLevel) with Logging {
 
-  private var twitterStream: TwitterStream = _
+  @volatile private var twitterStream: TwitterStream = _
+  @volatile private var stopped = false
 
   def onStart() {
 try {
@@ -78,7 +79,9 @@ class TwitterReceiver(
 def onScrubGeo(l: Long, l1: Long) {}
 def onStallWarning(stallWarning: StallWarning) {}
 def onException(e: Exception) {
-  restart(Error receiving tweets, e)
+  if (!stopped) {
+restart(Error receiving tweets, e)
+  }
 }
   })
 
@@ -91,12 +94,14 @@ class TwitterReceiver(
   }
   setTwitterStream(newTwitterStream)
   logInfo(Twitter receiver started)
+  stopped = false
 } catch {
   case e: Exception = restart(Error starting Twitter stream, e)
 }
   }
 
   def onStop() {
+stopped = true
 setTwitterStream(null)
 logInfo(Twitter receiver stopped)
   }