Repository: bahir Updated Branches: refs/heads/master f0d9a84f7 -> 889de659c
[BAHIR-97] Akka as SQL Streaming datasource. Closes #38. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/889de659 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/889de659 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/889de659 Branch: refs/heads/master Commit: 889de659c33dd56bad7193a4b69e6d05d061a2fd Parents: f0d9a84 Author: Subhobrata Dey <sbc...@gmail.com> Authored: Sun Mar 26 21:30:30 2017 -0700 Committer: Luciano Resende <lrese...@apache.org> Committed: Thu Apr 6 08:05:09 2017 -0700 ---------------------------------------------------------------------- pom.xml | 1 + sql-streaming-akka/README.md | 111 +++++++ .../streaming/akka/JavaAkkaStreamWordCount.java | 95 ++++++ .../streaming/akka/AkkaStreamWordCount.scala | 72 +++++ sql-streaming-akka/pom.xml | 120 ++++++++ .../src/main/assembly/assembly.xml | 44 +++ .../sql/streaming/akka/AkkaStreamSource.scala | 294 +++++++++++++++++++ .../bahir/sql/streaming/akka/MessageStore.scala | 83 ++++++ .../org/apache/bahir/utils/BahirUtils.scala | 47 +++ .../scala/org/apache/bahir/utils/Logging.scala | 24 ++ .../src/test/resources/feeder_actor.conf | 34 +++ .../src/test/resources/log4j.properties | 27 ++ .../streaming/akka/AkkaStreamSourceSuite.scala | 191 ++++++++++++ .../sql/streaming/akka/AkkaTestUtils.scala | 93 ++++++ 14 files changed, 1236 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 73cac1f..65129cd 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ <modules> <module>sql-cloudant</module> <module>streaming-akka</module> + <module>sql-streaming-akka</module> <module>streaming-mqtt</module> <module>sql-streaming-mqtt</module> <module>streaming-twitter</module> http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/README.md ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md new file mode 100644 index 0000000..b64a8e2 --- /dev/null +++ b/sql-streaming-akka/README.md @@ -0,0 +1,111 @@ +A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.). + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.2.0-SNAPSHOT" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-streaming-akka_2.11</artifactId> + <version>2.2.0-SNAPSHOT</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.2.0-SNAPSHOT + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. + +## Examples + +A SQL Stream can be created with data streams received from Akka Feeder actor using, + + sqlContext.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", "feederActorUri") + .load() + +## Enable recovering from failures. + +Setting values for option `persistenceDirPath` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown. + + sqlContext.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", "feederActorUri") + .option("persistenceDirPath", "/path/to/localdir") + .load() + +## Configuration options. + +This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html). + +* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor. +* `persistenceDirPath` By default it is used for storing incoming messages on disk. + +### Scala API + +An example, for scala API to count words from incoming message stream. + + // Create DataFrame representing the stream of input lines from connection + // to publisher or feeder actor + val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", urlOfPublisher) + .load().as[(String, Timestamp)] + + // Split the lines into words + val words = lines.map(_._1).flatMap(_.split(" ")) + + // Generate running word count + val wordCounts = words.groupBy("value").count() + + // Start running the query that prints the running counts to the console + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + +Please see `AkkaStreamWordCount.scala` for full example. + +### Java API + +An example, for Java API to count words from incoming message stream. + + // Create DataFrame representing the stream of input lines from connection + // to publisher or feeder actor + Dataset<String> lines = spark + .readStream() + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", urlOfPublisher) + .load().select("value").as(Encoders.STRING()); + + // Split the lines into words + Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String s) throws Exception { + return Arrays.asList(s.split(" ")).iterator(); + } + }, Encoders.STRING()); + + // Generate running word count + Dataset<Row> wordCounts = words.groupBy("value").count(); + + // Start running the query that prints the running counts to the console + StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + + query.awaitTermination(); + +Please see `JavaAkkaStreamWordCount.java` for full example. http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java b/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java new file mode 100644 index 0000000..59146ae --- /dev/null +++ b/sql-streaming-akka/examples/src/main/java/org/apache/bahir/examples/sql/streaming/akka/JavaAkkaStreamWordCount.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.examples.sql.streaming.akka; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; + +import java.util.Arrays; +import java.util.Iterator; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from Akka Feeder Actor system. + * + * Usage: AkkaStreamWordCount <urlOfPublisher> + * <urlOfPublisher> provides the uri of the publisher or feeder actor that Structured Streaming + * would connect to receive data. + * + * To run this on your local machine, a Feeder Actor System should be up and running. + * + */ +public final class JavaAkkaStreamWordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 1) { + System.err.println("Usage: JavaAkkaStreamWordCount <urlOfPublisher>"); + System.exit(1); + } + + if (!Logger.getRootLogger().getAllAppenders().hasMoreElements()) { + Logger.getRootLogger().setLevel(Level.WARN); + } + + String urlOfPublisher = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaAkkaStreamWordCount"); + + // check Spark configuration for master URL, set it to local if not configured + if (!sparkConf.contains("spark.master")) { + sparkConf.setMaster("local[4]"); + } + + SparkSession spark = SparkSession.builder() + .config(sparkConf) + .getOrCreate(); + + // Create DataFrame representing the stream of input lines from connection + // to publisher or feeder actor + Dataset<String> lines = spark + .readStream() + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", urlOfPublisher) + .load().select("value").as(Encoders.STRING()); + + // Split the lines into words + Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String s) throws Exception { + return Arrays.asList(s.split(" ")).iterator(); + } + }, Encoders.STRING()); + + // Generate running word count + Dataset<Row> wordCounts = words.groupBy("value").count(); + + // Start running the query that prints the running counts to the console + StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + + query.awaitTermination(); + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala b/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala new file mode 100644 index 0000000..8c4185a --- /dev/null +++ b/sql-streaming-akka/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/akka/AkkaStreamWordCount.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.examples.sql.streaming.akka + +import java.sql.Timestamp + +import org.apache.spark.sql.SparkSession + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from Akka Feeder Actor system. + * + * Usage: AkkaStreamWordCount <urlOfPublisher> + * <urlOfPublisher> provides the uri of the publisher or feeder actor that Structured Streaming + * would connect to receive data. + * + * To run this on your local machine, a Feeder Actor System should be up and running. + * + */ +object AkkaStreamWordCount { + def main(args: Array[String]): Unit = { + if (args.length < 1) { + System.err.println("Usage: AkkaStreamWordCount <urlOfPublisher>") // scalastyle:off println + System.exit(1) + } + + val urlOfPublisher = args(0) + + val spark = SparkSession + .builder() + .appName("AkkaStreamWordCount") + .master("local[4]") + .getOrCreate() + + import spark.implicits._ + + // Create DataFrame representing the stream of input lines from connection + // to publisher or feeder actor + val lines = spark.readStream + .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", urlOfPublisher) + .load().as[(String, Timestamp)] + + // Split the lines into words + val words = lines.map(_._1).flatMap(_.split(" ")) + + // Generate running word count + val wordCounts = words.groupBy("value").count() + + // Start running the query that prints the running counts to the console + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/pom.xml ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml new file mode 100644 index 0000000..4d7040b --- /dev/null +++ b/sql-streaming-akka/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-parent_2.11</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.apache.bahir</groupId> + <artifactId>spark-sql-streaming-akka_2.11</artifactId> + <properties> + <sbt.project.name>sql-streaming-akka</sbt.project.name> + </properties> + <packaging>jar</packaging> + <name>Apache Bahir - Spark SQL Streaming Akka</name> + <url>http://bahir.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-actor_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-slf4j_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + <version>5.1.2</version> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + + <!-- Assemble a jar with test dependencies for Python tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>test-jar-with-dependencies</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <!-- Make sure the file path is same as the sbt build --> + <finalName>spark-streaming-akka-test-${project.version}</finalName> + <outputDirectory>${project.build.directory}/scala-${scala.binary.version}</outputDirectory> + <appendAssemblyId>false</appendAssemblyId> + <!-- Don't publish it since it's only for Python tests --> + <attach>false</attach> + <descriptors> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/assembly/assembly.xml b/sql-streaming-akka/src/main/assembly/assembly.xml new file mode 100644 index 0000000..58a95a0 --- /dev/null +++ b/sql-streaming-akka/src/main/assembly/assembly.xml @@ -0,0 +1,44 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly> + <id>test-jar-with-dependencies</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory> + <outputDirectory></outputDirectory> + </fileSet> + </fileSets> + + <dependencySets> + <dependencySet> + <useTransitiveDependencies>true</useTransitiveDependencies> + <scope>test</scope> + <unpack>true</unpack> + <excludes> + <exclude>org.apache.hadoop:*:jar</exclude> + <exclude>org.apache.zookeeper:*:jar</exclude> + <exclude>org.apache.avro:*:jar</exclude> + </excludes> + </dependencySet> + </dependencySets> + +</assembly> http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala new file mode 100644 index 0000000..96d892f --- /dev/null +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.akka + +import java.nio.ByteBuffer +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, Objects} +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.concurrent.Future +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} + +import akka.actor._ +import akka.actor.SupervisorStrategy.{Escalate, Restart} +import akka.pattern.ask +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.rocksdb.{Options, RocksDB} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +import org.apache.bahir.utils.Logging + +object AkkaStreamConstants { + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + + val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + + val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException => Restart + case _: Exception => Escalate + } + + val defaultActorSystemCreator: () => ActorSystem = () => { +// val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" + val uniqueSystemName = s"streaming-actor-system" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |akka.remote.netty.tcp.port = "0" + |akka.loggers.0 = "akka.event.slf4j.Slf4jLogger" + |akka.log-dead-letters-during-shutdown = "off" + """.stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } +} + +case class SubscribeReceiver(receiverActor: ActorRef) +case class UnsubscribeReceiver(receiverActor: ActorRef) + +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +private[akka] sealed trait ActorReceiverData +private[akka] case class SingleItemData(item: String) extends ActorReceiverData +private[akka] case class AskStoreSingleItemData(item: String) extends ActorReceiverData +private[akka] case class IteratorData(iterator: Iterator[String]) extends ActorReceiverData +private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] object Ack extends ActorReceiverData + +class AkkaStreamSource(urlOfPublisher: String, + persistence: RocksDB, sqlContext: SQLContext, + messageParser: String => (String, Timestamp)) + extends Source with Logging { + + override def schema: StructType = AkkaStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)]() + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var actorSystem: ActorSystem = _ + private var actorSupervisor: ActorRef = _ + + private def fetchLastProcessedOffset(): Int = { + Try(store.maxProcessedOffset) match { + case Success(x) => + log.info(s"Recovering from last stored offset $x") + x + case Failure(e) => 0 + } + } + + initialize() + private def initialize(): Unit = { + + class ActorReceiver(urlOfPublisher: String) extends Actor { + + lazy private val remotePublisher = context.actorSelection(urlOfPublisher) + + override def preStart(): Unit = remotePublisher ! SubscribeReceiver(context.self) + + override def receive: PartialFunction[Any, Unit] = { + case msg: String => store(msg) + } + + override def postStop(): Unit = remotePublisher ! UnsubscribeReceiver(context.self) + + def store(iter: Iterator[String]) = { + context.parent ! IteratorData(iter) + } + + def store(item: String) = { + context.parent ! SingleItemData(item) + } + + def store(item: String, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } + } + + class Supervisor extends Actor { + override val supervisorStrategy = AkkaStreamConstants.defaultSupervisorStrategy + + private val props = Props(new ActorReceiver(urlOfPublisher)) + private val name = "ActorReceiver" + private val worker = context.actorOf(props, name) + log.info("Started receiver actor at:" + worker.path) + + private val n: AtomicInteger = new AtomicInteger(0) + private val hiccups: AtomicInteger = new AtomicInteger(0) + + override def receive: PartialFunction[Any, Unit] = { + + case data => + initLock.await() + var temp = offset + 1 + + data match { + case IteratorData(iterator) => + log.debug("received iterator") + iterator.asInstanceOf[Iterator[String]].foreach(record => { + messages.put(temp, messageParser(record.toString)) + temp += 1 + }) + + case SingleItemData(msg) => + log.debug("received single") + messages.put(temp, messageParser(msg)) + n.incrementAndGet() + + case AskStoreSingleItemData(msg) => + log.debug("received single sync") + messages.put(temp, messageParser(msg)) + n.incrementAndGet() + sender() ! Ack + + case ByteBufferData(bytes) => + log.debug("received bytes") + messages.put(temp, messageParser(new String(bytes.array()))) + + case props: Props => + val worker = context.actorOf(props) + log.info("Started receiver worker at:" + worker.path) + sender() ! worker + + case (props: Props, name: String) => + val worker = context.actorOf(props, name) + log.info("Started receiver worker at:" + worker.path) + sender() ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics => + val workers = context.children + sender() ! Statistics(n.get(), workers.size, hiccups.get(), workers.mkString("\n")) + } + offset = temp + } + } + + actorSystem = AkkaStreamConstants.defaultActorSystemCreator() + actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor") + offset = fetchLastProcessedOffset() + initLock.countDown() + } + + override def stop(): Unit = { + actorSupervisor ! PoisonPill + Persistence.close() + actorSystem.shutdown() + actorSystem.awaitTermination() + } + + override def getOffset: Option[Offset] = { + if (offset == 0) { + None + } else { + Some(LongOffset(offset)) + } + } + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt + val endIndex = end.asInstanceOf[LongOffset].offset.toInt + val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty + + ((startIndex + 1) to endIndex).foreach { id => + val element: (String, Timestamp) = messages.getOrElse(id, + store.retrieve[(String, Timestamp)](id).orNull) + + if (!Objects.isNull(element)) { + data += element + store.store(id, element) + } + messages.remove(id, element) + } + log.trace(s"Get Batch invoked, ${data.mkString}") + import sqlContext.implicits._ + data.toDF("value", "timestamp") + } +} + +class AkkaStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { + + override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], + providerName: String, parameters: Map[String, String]) + : (String, StructType) = ("akka", AkkaStreamConstants.SCHEMA_DEFAULT) + + override def createSource(sqlContext: SQLContext, metadataPath: String, + schema: Option[StructType], providerName: String, + parameters: Map[String, String]): Source = { + + def e(s: String) = new IllegalArgumentException(s) + + val urlOfPublisher: String = parameters.getOrElse("urlOfPublisher", parameters.getOrElse("path", + throw e( + s"""Please provide url of Publisher actor by specifying path + | or .options("urlOfPublisher",...)""".stripMargin))) + + val persistenceDirPath: String = parameters.getOrElse("persistenceDirPath", + System.getProperty("java.io.tmpdir")) + + val messageParserWithTimestamp = (x: String) => + (x, Timestamp.valueOf(AkkaStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime))) + + val persistence = Persistence.getOrCreatePersistenceInstance(persistenceDirPath) + new AkkaStreamSource(urlOfPublisher, persistence, sqlContext, messageParserWithTimestamp) + } + + override def shortName(): String = "akka" +} + +object Persistence { + var persistence: RocksDB = _ + + def getOrCreatePersistenceInstance(persistenceDirPath: String): RocksDB = { + if (Objects.isNull(persistence)) { + RocksDB.loadLibrary() + persistence = RocksDB.open(new Options().setCreateIfMissing(true), persistenceDirPath) + } + persistence + } + + def close(): Unit = { + if (!Objects.isNull(persistence)) { + persistence.close() + persistence = null + } + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala new file mode 100644 index 0000000..9babd82 --- /dev/null +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.akka + +import java.nio.ByteBuffer + +import scala.reflect.ClassTag + +import org.rocksdb.RocksDB + +import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerInstance} +import org.apache.spark.SparkConf + +import org.apache.bahir.utils.Logging + + +trait MessageStore { + + def store[T: ClassTag](id: Int, message: T): Boolean + + def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] + + def retrieve[T: ClassTag](id: Int): Option[T] + + def maxProcessedOffset: Int +} + +private[akka] class LocalMessageStore(val persistentStore: RocksDB, + val serializer: Serializer) + extends MessageStore with Logging { + + val classLoader = Thread.currentThread().getContextClassLoader + + def this(persistentStore: RocksDB, conf: SparkConf) = + this(persistentStore, new JavaSerializer(conf)) + + val serializerInstance: SerializerInstance = serializer.newInstance() + + private def get(id: Int) = persistentStore.get(id.toString.getBytes) + + override def maxProcessedOffset: Int = persistentStore.getLatestSequenceNumber.toInt + + override def store[T: ClassTag](id: Int, message: T): Boolean = { + val bytes: Array[Byte] = serializerInstance.serialize(message).array() + try { + persistentStore.put(id.toString.getBytes(), bytes) + true + } catch { + case e: Exception => log.warn(s"Failed to store message Id: $id", e) + false + } + } + + override def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] = { + (start until end).map(x => retrieve(x)) + } + + override def retrieve[T: ClassTag](id: Int): Option[T] = { + val bytes = persistentStore.get(id.toString.getBytes) + + if (bytes != null) { + Some(serializerInstance.deserialize( + ByteBuffer.wrap(bytes), classLoader)) + } else { + None + } + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala new file mode 100644 index 0000000..996a0a1 --- /dev/null +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.utils + +import java.io.{File, IOException} +import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} +import java.nio.file.attribute.BasicFileAttributes + +object BahirUtils extends Logging { + + def recursiveDeleteDir(dir: File): Path = { + Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + try { + Files.delete(file) + } catch { + case t: Throwable => log.warn("Failed to delete", t) + } + FileVisitResult.CONTINUE + } + + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + try { + Files.delete(dir) + } catch { + case t: Throwable => log.warn("Failed to delete", t) + } + FileVisitResult.CONTINUE + } + }) + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala new file mode 100644 index 0000000..776ed5a --- /dev/null +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.utils + +import org.slf4j.LoggerFactory + +trait Logging { + final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/resources/feeder_actor.conf ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/resources/feeder_actor.conf b/sql-streaming-akka/src/test/resources/feeder_actor.conf new file mode 100644 index 0000000..9ec210e --- /dev/null +++ b/sql-streaming-akka/src/test/resources/feeder_actor.conf @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +akka { + loglevel = "INFO" + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + log-sent-messages = on + log-received-messages = on + } + loggers.0 = "akka.event.slf4j.Slf4jLogger" + log-dead-letters-during-shutdown = "off" +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/resources/log4j.properties b/sql-streaming-akka/src/test/resources/log4j.properties new file mode 100644 index 0000000..3706a6e --- /dev/null +++ b/sql-streaming-akka/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark_project.jetty=WARN http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala new file mode 100644 index 0000000..a04dc66 --- /dev/null +++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.akka + +import java.io.File + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp +import org.apache.spark.sql.execution.streaming.LongOffset + +import org.apache.bahir.utils.BahirUtils + +class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter { + + protected var akkaTestUtils: AkkaTestUtils = _ + protected val tempDir: File = + new File(System.getProperty("java.io.tmpdir") + "/spark-akka-persistence") + + private val conf = new SparkConf().setMaster("local[4]").setAppName("AkkaStreamSourceSuite") + protected val spark = SparkSession.builder().config(conf).getOrCreate() + + akkaTestUtils = new AkkaTestUtils + akkaTestUtils.setup() + + before { + tempDir.mkdirs() + } + + after { + Persistence.close() + BahirUtils.recursiveDeleteDir(tempDir) + } + + protected val tmpDir: String = tempDir.getAbsolutePath + + protected def createStreamingDataframe(dir: String = tmpDir): (SQLContext, DataFrame) = { + + val sqlContext: SQLContext = spark.sqlContext + + sqlContext.setConf("spark.sql.streaming.checkpointLocation", dir + "/checkpoint") + + val dataFrame: DataFrame = + sqlContext.readStream.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider") + .option("urlOfPublisher", akkaTestUtils.getFeederActorUri()) + .option("persistenceDirPath", dir + "/persistence").load() + (sqlContext, dataFrame) + } +} + +class BasicAkkaSourceSuite extends AkkaStreamSourceSuite { + + private def writeStreamResults(sqlContext: SQLContext, dataFrame: DataFrame, + waitDuration: Long): Boolean = { + import sqlContext.implicits._ + dataFrame.as[(String, Timestamp)].writeStream.format("parquet") + .start(s"$tmpDir/parquet/t.parquet").awaitTermination(waitDuration) + } + + private def readBackSreamingResults(sqlContext: SQLContext): mutable.Buffer[String] = { + import sqlContext.implicits._ + val asList = + sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT) + .parquet(s"$tmpDir/parquet/t.parquet").as[(String, Timestamp)].map(_._1) + .collectAsList().asScala + asList + } + + test("basic usage") { + val message = "Akka is a reactive framework" + + akkaTestUtils.setMessage(message) + akkaTestUtils.setCountOfMessages(1) + + val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe() + + writeStreamResults(sqlContext, dataFrame, 10000) + + val resultBuffer: mutable.Buffer[String] = readBackSreamingResults(sqlContext) + + assert(resultBuffer.size === 1) + assert(resultBuffer.head === message) + } + + test("Send and receive 100 messages.") { + val message = "Akka is a reactive framework" + + akkaTestUtils.setMessage(message) + akkaTestUtils.setCountOfMessages(100) + + val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe() + + writeStreamResults(sqlContext, dataFrame, 10000) + + val resultBuffer: mutable.Buffer[String] = readBackSreamingResults(sqlContext) + + assert(resultBuffer.size === 100) + assert(resultBuffer.head === message) + } + + test("params not provided") { + val persistenceDirPath = tempDir.getAbsolutePath + "/persistence" + + val provider = new AkkaStreamSourceProvider + val sqlContext: SQLContext = spark.sqlContext + + val parameters = Map("persistenceDirPath" -> persistenceDirPath) + + intercept[IllegalArgumentException] { + provider.createSource(sqlContext, "", None, "", parameters) + } + } + + test("Recovering offset from the last processed offset") { + val persistenceDirPath = tempDir.getAbsolutePath + "/persistence" + val message = "Akka is a reactive framework" + + akkaTestUtils.setMessage(message) + akkaTestUtils.setCountOfMessages(100) + + val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe() + + writeStreamResults(sqlContext, dataFrame, 10000) + + val provider = new AkkaStreamSourceProvider + val parameters = Map("urlOfPublisher" -> akkaTestUtils.getFeederActorUri(), + "persistenceDirPath" -> persistenceDirPath) + + val offset: Long = provider.createSource(sqlContext, "", None, "", parameters) + .getOffset.get.asInstanceOf[LongOffset].offset + assert(offset === 100L) + } +} + +class StressTestAkkaSource extends AkkaStreamSourceSuite { + + // Run with -Xmx1024m + // Default allowed payload size sent to an akka actor is 128000 bytes. + test("Send & Receive messages of size 128000 bytes.") { + + val freeMemory: Long = Runtime.getRuntime.freeMemory() + + log.info(s"Available memory before test run is ${freeMemory / (1024 * 1024)}MB.") + + val noOfMsgs = 124 * 1024 + + val messageBuilder = new mutable.StringBuilder() + for (i <- 0 until noOfMsgs) yield messageBuilder.append(((i % 26) + 65).toChar) + + val message = messageBuilder.toString() + + akkaTestUtils.setMessage(message) + akkaTestUtils.setCountOfMessages(1) + + val (sqlContext: SQLContext, dataFrame: DataFrame) = createStreamingDataframe() + + import sqlContext.implicits._ + + dataFrame.as[(String, Timestamp)].writeStream + .format("parquet") + .start(s"$tmpDir/parquet/t.parquet") + .awaitTermination(25000) + + val outputMessage = + sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT) + .parquet(s"$tmpDir/parquet/t.parquet").as[(String, Timestamp)] + .map(_._1).head() + + assert(outputMessage === message) + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/889de659/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala new file mode 100644 index 0000000..9cbfc32 --- /dev/null +++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.bahir.sql.streaming.akka + +import java.io.File + +import scala.collection.mutable +import scala.util.Random + +import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, Props} +import com.typesafe.config.{Config, ConfigFactory} + +import org.apache.bahir.utils.Logging + +class AkkaTestUtils extends Logging { + private val actorSystemName = "feeder-actor-system" + private var actorSystem: ActorSystem = _ + + private val feederActorName = "feederActor" + + private var message: String = _ + private var count = 1 + + def getFeederActorConfig(): Config = { + val configFile = getClass.getClassLoader + .getResource("feeder_actor.conf").getFile + ConfigFactory.parseFile(new File(configFile)) + } + + def getFeederActorUri(): String = + s"${actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}" + + s"/user/$feederActorName" + + class FeederActor extends Actor { + + val rand = new Random() + val receivers = new mutable.LinkedHashSet[ActorRef]() + + val sendMessageThread = + new Thread() { + override def run(): Unit = { + var counter = 0 + while (counter < count) { +// Thread.sleep(500) + receivers.foreach(_ ! message) + counter += 1 + } + } + } + + override def receive: Receive = { + case SubscribeReceiver(receiverActor: ActorRef) => + log.debug(s"received subscribe from ${receiverActor.toString}") + receivers += receiverActor + sendMessageThread.run() + + case UnsubscribeReceiver(receiverActor: ActorRef) => + log.debug(s"received unsubscribe from ${receiverActor.toString}") + receivers -= receiverActor + } + } + + def setup(): Unit = { + val feederConf = getFeederActorConfig() + + actorSystem = ActorSystem(actorSystemName, feederConf) + actorSystem.actorOf(Props(new FeederActor), feederActorName) + } + + def shutdown(): Unit = { +// actorSystem.awaitTermination() + actorSystem.shutdown() + } + + def setMessage(message: String): Unit = this.message = message + def setCountOfMessages(messageCount: Int): Unit = count = messageCount +}