Repository: spark Updated Branches: refs/heads/master fd1dcfaf2 -> 8113dbda0
[STREAMING][DOCS][EXAMPLES] Minor fixes Author: Jacek Laskowski <[email protected]> Closes #10603 from jaceklaskowski/streaming-actor-custom-receiver. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8113dbda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8113dbda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8113dbda Branch: refs/heads/master Commit: 8113dbda0bd51fdbe20dbfad466b8d25304a01f4 Parents: fd1dcfa Author: Jacek Laskowski <[email protected]> Authored: Thu Jan 7 00:27:13 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Thu Jan 7 00:27:13 2016 -0800 ---------------------------------------------------------------------- docs/streaming-custom-receivers.md | 8 ++++---- .../apache/spark/examples/streaming/ActorWordCount.scala | 10 ++++------ 2 files changed, 8 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8113dbda/docs/streaming-custom-receivers.md ---------------------------------------------------------------------- diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index a75587a..97db865 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -257,9 +257,9 @@ The following table summarizes the characteristics of both types of receivers ## Implementing and Using a Custom Actor-based Receiver -Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to +Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) -trait can be applied on any Akka actor, which allows received data to be stored in Spark using +trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. {% highlight scala %} @@ -273,8 +273,8 @@ class CustomActor extends Actor with ActorHelper { And a new input stream can be created with this custom actor as {% highlight scala %} -// Assuming ssc is the StreamingContext -val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver") +val ssc: StreamingContext = ... +val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver") {% endhighlight %} See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) http://git-wip-us.apache.org/repos/asf/spark/blob/8113dbda/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 8b8dae0..a47fb7b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -62,15 +62,13 @@ class FeederActor extends Actor { }.start() def receive: Receive = { - case SubscribeReceiver(receiverActor: ActorRef) => println("received subscribe from %s".format(receiverActor.toString)) - receivers = LinkedList(receiverActor) ++ receivers + receivers = LinkedList(receiverActor) ++ receivers case UnsubscribeReceiver(receiverActor: ActorRef) => println("received unsubscribe from %s".format(receiverActor.toString)) - receivers = receivers.dropWhile(x => x eq receiverActor) - + receivers = receivers.dropWhile(x => x eq receiverActor) } } @@ -129,9 +127,9 @@ object FeederActor { * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.0.1 9999` * and then run the example - * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.0.1 9999` */ object ActorWordCount { def main(args: Array[String]) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
