Repository: bahir Updated Branches: refs/heads/master 50ecf2058 -> d43dad219
[BAHIR-64] add Akka streaming test (send/receive) This PR adds the test suite AkkaStreamSuite.scala to the streaming connector streaming-akka to test data being sent and received. Closes #24 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/d43dad21 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/d43dad21 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/d43dad21 Branch: refs/heads/master Commit: d43dad21963d2ba338acc44d6233ff020cef7d38 Parents: 50ecf20 Author: Christian Kadner <[email protected]> Authored: Wed Sep 28 12:41:35 2016 -0700 Committer: Luciano Resende <[email protected]> Committed: Thu Nov 17 13:05:16 2016 +0100 ---------------------------------------------------------------------- NOTICE | 2 +- .../spark/streaming/akka/ActorReceiver.scala | 1 + .../spark/streaming/akka/AkkaStreamSuite.scala | 106 +++++++++++++++++++ 3 files changed, 108 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 8bf7751..7067f1e 100644 --- a/NOTICE +++ b/NOTICE @@ -2,4 +2,4 @@ Apache Bahir Copyright (c) 2016 The Apache Software Foundation. This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file +The Apache Software Foundation (http://www.apache.org/). http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index e3be880..d30e380 100644 --- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -65,6 +65,7 @@ object ActorReceiver { val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |akka.remote.netty.tcp.port = "0" |""".stripMargin) ActorSystem(uniqueSystemName, akkaConf) } http://git-wip-us.apache.org/repos/asf/bahir/blob/d43dad21/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala new file mode 100644 index 0000000..e52bf0e --- /dev/null +++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + +import akka.actor._ +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} + +class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { + + private var ssc: StreamingContext = _ + + private var actorSystem: ActorSystem = _ + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + if (actorSystem != null) { + actorSystem.shutdown() + actorSystem.awaitTermination(30.seconds) + actorSystem = null + } + } + + test("actor input stream") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + + // we set the TCP port to "0" to have the port chosen automatically for the Feeder actor and + // the Receiver actor will "pick it up" from the Feeder URI when it subscribes to the Feeder + // actor (http://doc.akka.io/docs/akka/2.3.11/scala/remoting.html) + val akkaConf = ConfigFactory.parseMap( + Map( + "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", + "akka.remote.netty.tcp.transport-class" -> "akka.remote.transport.netty.NettyTransport", + "akka.remote.netty.tcp.port" -> "0"). + asJava) + actorSystem = ActorSystem("test", akkaConf) + actorSystem.actorOf(Props(classOf[FeederActor]), "FeederActor") + val feederUri = + actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + "/user/FeederActor" + + val actorStream = + AkkaUtils.createStream[String](ssc, Props(classOf[TestActorReceiver], feederUri), + "TestActorReceiver") + val result = new ConcurrentLinkedQueue[String] + actorStream.foreachRDD { rdd => + rdd.collect().foreach(result.add) + } + ssc.start() + + eventually(timeout(10.seconds), interval(10.milliseconds)) { + assert((1 to 10).map(_.toString) === result.asScala.toList) + } + } +} + +case class SubscribeReceiver(receiverActor: ActorRef) + +class FeederActor extends Actor { + + def receive: Receive = { + case SubscribeReceiver(receiverActor: ActorRef) => + (1 to 10).foreach(i => receiverActor ! i.toString()) + } +} + +class TestActorReceiver(uriOfPublisher: String) extends ActorReceiver { + + lazy private val remotePublisher = context.actorSelection(uriOfPublisher) + + override def preStart(): Unit = { + remotePublisher ! SubscribeReceiver(self) + } + + def receive: PartialFunction[Any, Unit] = { + case msg: String => store(msg) + } + +}
