Repository: bahir Updated Branches: refs/heads/master b3902bac6 -> be1effaaf
[BAHIR-166] Migrate akka sql streaming source to DataSource v2 API Migrate akka sql streaming source to DataSource v2 API. Closes #67 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/be1effaa Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/be1effaa Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/be1effaa Branch: refs/heads/master Commit: be1effaaf7cfde28d19e032e038694e01fbf169b Parents: b3902ba Author: shimamoto <shimam...@apache.org> Authored: Thu May 31 17:15:04 2018 +0900 Committer: Luciano Resende <lrese...@apache.org> Committed: Wed Nov 7 19:28:11 2018 -0800 ---------------------------------------------------------------------- pom.xml | 8 +- sql-streaming-akka/README.md | 2 +- .../sql/streaming/akka/AkkaStreamSource.scala | 183 ++++++++++++------- .../bahir/sql/streaming/akka/LongOffset.scala | 54 ++++++ .../bahir/sql/streaming/akka/MessageStore.scala | 18 +- .../streaming/akka/AkkaStreamSourceSuite.scala | 13 +- .../sql/streaming/akka/AkkaTestUtils.scala | 5 +- .../streaming/akka/ActorWordCount.scala | 8 +- .../spark/streaming/akka/ActorReceiver.scala | 5 +- .../spark/streaming/akka/AkkaStreamSuite.scala | 4 +- .../streaming/zeromq/ZeroMQWordCount.scala | 8 +- 11 files changed, 214 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8282346..13407bd 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ <modules> <module>sql-cloudant</module> <module>streaming-akka</module> - <!-- <module>sql-streaming-akka</module> Disabling akka sql module, until it is updated to run with datasource v2 API. --> + <module>sql-streaming-akka</module> <module>streaming-mqtt</module> <module>sql-streaming-mqtt</module> <module>streaming-twitter</module> @@ -105,7 +105,7 @@ <mqtt.paho.client>1.1.0</mqtt.paho.client> <!-- Streaming Akka connector --> <akka.group>com.typesafe.akka</akka.group> - <akka.version>2.4.20</akka.version> + <akka.version>2.5.12</akka.version> <akka_zeromq.version>2.3.16</akka_zeromq.version> <protobuf.version>2.5.0</protobuf.version> @@ -569,7 +569,7 @@ <include>**/*Suite.java</include> </includes> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <argLine>-ea -Xmx3g -Xss4m -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine> + <argLine>-Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine> <environmentVariables> <!-- Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes @@ -1042,6 +1042,7 @@ </modules> </profile> + <!-- <profile> <id>scala-2.10</id> <activation> @@ -1124,6 +1125,7 @@ </plugins> </build> </profile> + --> <profile> <id>test-java-home</id> http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/README.md ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md index 3d9d17b..29685ee 100644 --- a/sql-streaming-akka/README.md +++ b/sql-streaming-akka/README.md @@ -45,7 +45,7 @@ Setting values for option `persistenceDirPath` helps in recovering in case of a ## Configuration options. -This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html). +This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.5/akka/actor/Actor.html). * `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor. * `persistenceDirPath` By default it is used for storing incoming messages on disk. http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala index 96d892f..3f2101c 100644 --- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala @@ -20,16 +20,18 @@ package org.apache.bahir.sql.streaming.akka import java.nio.ByteBuffer import java.sql.Timestamp import java.text.SimpleDateFormat -import java.util.{Calendar, Objects} +import java.util +import java.util.{Calendar, Objects, Optional} import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ -import scala.concurrent.Future import scala.language.postfixOps -import scala.util.{Failure, Success, Try} import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} @@ -38,9 +40,12 @@ import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.rocksdb.{Options, RocksDB} -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} import org.apache.bahir.utils.Logging @@ -87,32 +92,29 @@ private[akka] case class IteratorData(iterator: Iterator[String]) extends ActorR private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData private[akka] object Ack extends ActorReceiverData -class AkkaStreamSource(urlOfPublisher: String, - persistence: RocksDB, sqlContext: SQLContext, - messageParser: String => (String, Timestamp)) - extends Source with Logging { +class AkkaMicroBatchReader(urlOfPublisher: String, + persistence: RocksDB, + messageParser: String => (String, Timestamp)) + extends MicroBatchReader with Logging { - override def schema: StructType = AkkaStreamConstants.SCHEMA_DEFAULT + private val store = new LocalMessageStore(persistence, SparkEnv.get.conf) - private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) - - private val messages = new TrieMap[Int, (String, Timestamp)]() + private val messages = new TrieMap[Long, (String, Timestamp)]() private val initLock = new CountDownLatch(1) - private var offset = 0 + @GuardedBy("this") + private var currentOffset: LongOffset = LongOffset(-1L) + + @GuardedBy("this") + private var lastOffsetCommitted: LongOffset = LongOffset(-1L) + + private var startOffset: Offset = _ + private var endOffset: Offset = _ private var actorSystem: ActorSystem = _ private var actorSupervisor: ActorRef = _ - private def fetchLastProcessedOffset(): Int = { - Try(store.maxProcessedOffset) match { - case Success(x) => - log.info(s"Recovering from last stored offset $x") - x - case Failure(e) => 0 - } - } initialize() private def initialize(): Unit = { @@ -157,7 +159,7 @@ class AkkaStreamSource(urlOfPublisher: String, case data => initLock.await() - var temp = offset + 1 + var temp = currentOffset.offset + 1 data match { case IteratorData(iterator) => @@ -198,80 +200,133 @@ class AkkaStreamSource(urlOfPublisher: String, val workers = context.children sender() ! Statistics(n.get(), workers.size, hiccups.get(), workers.mkString("\n")) } - offset = temp + currentOffset = LongOffset(temp) } } actorSystem = AkkaStreamConstants.defaultActorSystemCreator() actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor") - offset = fetchLastProcessedOffset() + if (store.maxProcessedOffset > 0) { + currentOffset = LongOffset(store.maxProcessedOffset) + } initLock.countDown() } + // This method is only used for unit test + private[akka] def getCurrentOffset: LongOffset = { + currentOffset.copy() + } + + + override def getEndOffset: Offset = { + Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set")) + } + + override def getStartOffset: Offset = { + Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set")) + } + + override def setOffsetRange(start: Optional[Offset], + end: Optional[Offset]): Unit = synchronized { + startOffset = start.orElse(LongOffset(-1L)) + endOffset = end.orElse(currentOffset) + } + + override def commit(end: Offset): Unit = synchronized { + val newOffset = LongOffset.convert(end).getOrElse( + sys.error(s"AkkaMicroBatchReader.commit() received an offset ($end) that did not " + + s"originate with an instance of this class") + ) + + val offsetDiff = newOffset.offset - lastOffsetCommitted.offset + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + } + + (lastOffsetCommitted.offset until newOffset.offset).foreach { x => + messages.remove(x + 1) + } + lastOffsetCommitted = newOffset + } + + override def deserializeOffset(json: String): Offset = { + LongOffset(json.toLong) + } + override def stop(): Unit = { actorSupervisor ! PoisonPill Persistence.close() - actorSystem.shutdown() - actorSystem.awaitTermination() + Await.ready(actorSystem.terminate(), Duration.Inf) } - override def getOffset: Option[Offset] = { - if (offset == 0) { - None - } else { - Some(LongOffset(offset)) + override def readSchema(): StructType = AkkaStreamConstants.SCHEMA_DEFAULT + + override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = { + assert(startOffset != null && endOffset != null, + "start offset and end offset should already be set before create read tasks.") + + val (start, end) = synchronized { + (LongOffset.convert(startOffset).get.offset + 1, LongOffset.convert(endOffset).get.offset + 1) + } + val rawList = for (i <- start until end) yield { + store.store(i, messages(i)) + messages(i) + } + + val numPartitions = SparkSession.getActiveSession.get.sparkContext.defaultParallelism + + val slices = Array.fill(numPartitions)(new ArrayBuffer[(String, Timestamp)]) + rawList.zipWithIndex.foreach { case (r, idx) => + slices(idx % numPartitions).append(r) } - } - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - val startIndex = start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt - val endIndex = end.asInstanceOf[LongOffset].offset.toInt - val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty + (0 until numPartitions).map { i => + val slice = slices(i) + new DataReaderFactory[Row] { + override def createDataReader(): DataReader[Row] = new DataReader[Row] { + private var currentIdx = -1 - ((startIndex + 1) to endIndex).foreach { id => - val element: (String, Timestamp) = messages.getOrElse(id, - store.retrieve[(String, Timestamp)](id).orNull) + override def next(): Boolean = { + currentIdx += 1 + currentIdx < slice.size + } - if (!Objects.isNull(element)) { - data += element - store.store(id, element) + override def get(): Row = { + Row.fromTuple(slice(currentIdx)) + } + + override def close(): Unit = {} + } } - messages.remove(id, element) - } - log.trace(s"Get Batch invoked, ${data.mkString}") - import sqlContext.implicits._ - data.toDF("value", "timestamp") + }.toList.asJava } -} -class AkkaStreamSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { +} - override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], - providerName: String, parameters: Map[String, String]) - : (String, StructType) = ("akka", AkkaStreamConstants.SCHEMA_DEFAULT) +class AkkaStreamSourceProvider extends MicroBatchReadSupport with DataSourceRegister with Logging { - override def createSource(sqlContext: SQLContext, metadataPath: String, - schema: Option[StructType], providerName: String, - parameters: Map[String, String]): Source = { + override def shortName(): String = "akka" - def e(s: String) = new IllegalArgumentException(s) + override def createMicroBatchReader(schema: Optional[StructType], + metadataPath: String, + options: DataSourceOptions): MicroBatchReader = { + val parameters = options.asMap().asScala.toMap - val urlOfPublisher: String = parameters.getOrElse("urlOfPublisher", parameters.getOrElse("path", - throw e( + val urlOfPublisher: String = parameters.getOrElse("urlofpublisher", parameters.getOrElse("path", + throw new IllegalArgumentException( s"""Please provide url of Publisher actor by specifying path | or .options("urlOfPublisher",...)""".stripMargin))) - val persistenceDirPath: String = parameters.getOrElse("persistenceDirPath", + val persistenceDirPath: String = parameters.getOrElse("persistencedirpath", System.getProperty("java.io.tmpdir")) val messageParserWithTimestamp = (x: String) => (x, Timestamp.valueOf(AkkaStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime))) val persistence = Persistence.getOrCreatePersistenceInstance(persistenceDirPath) - new AkkaStreamSource(urlOfPublisher, persistence, sqlContext, messageParserWithTimestamp) + new AkkaMicroBatchReader(urlOfPublisher, persistence, messageParserWithTimestamp) } - - override def shortName(): String = "akka" } object Persistence { http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala new file mode 100644 index 0000000..c961487 --- /dev/null +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/LongOffset.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.akka + +import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} + +/** + * @note As of 2.3.0, [[org.apache.spark.sql.execution.streaming.LongOffset]] + * hasn't extended v2 Offset yet. Fix version is 3.0.0. Until then + * this is a required class. + * @see SPARK-23092 + */ +case class LongOffset(offset: Long) extends OffsetV2 { + + override val json = offset.toString + + def +(increment: Long): LongOffset = new LongOffset(offset + increment) + def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) +} + +object LongOffset { + + /** + * LongOffset factory from serialized offset. + * @return new LongOffset + */ + def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong) + + /** + * Convert generic Offset to LongOffset if possible. + * @return converted LongOffset + */ + def convert(offset: Offset): Option[LongOffset] = offset match { + case lo: LongOffset => Some(lo) + case so: SerializedOffset => Some(LongOffset(so)) + case _ => None + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala index 9babd82..9b7f910 100644 --- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/MessageStore.scala @@ -31,13 +31,13 @@ import org.apache.bahir.utils.Logging trait MessageStore { - def store[T: ClassTag](id: Int, message: T): Boolean + def store[T: ClassTag](id: Long, message: T): Boolean - def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] + def retrieve[T: ClassTag](start: Long, end: Long): Seq[Option[T]] - def retrieve[T: ClassTag](id: Int): Option[T] + def retrieve[T: ClassTag](id: Long): Option[T] - def maxProcessedOffset: Int + def maxProcessedOffset: Long } private[akka] class LocalMessageStore(val persistentStore: RocksDB, @@ -51,11 +51,11 @@ private[akka] class LocalMessageStore(val persistentStore: RocksDB, val serializerInstance: SerializerInstance = serializer.newInstance() - private def get(id: Int) = persistentStore.get(id.toString.getBytes) + private def get(id: Long) = persistentStore.get(id.toString.getBytes) - override def maxProcessedOffset: Int = persistentStore.getLatestSequenceNumber.toInt + override def maxProcessedOffset: Long = persistentStore.getLatestSequenceNumber - override def store[T: ClassTag](id: Int, message: T): Boolean = { + override def store[T: ClassTag](id: Long, message: T): Boolean = { val bytes: Array[Byte] = serializerInstance.serialize(message).array() try { persistentStore.put(id.toString.getBytes(), bytes) @@ -66,11 +66,11 @@ private[akka] class LocalMessageStore(val persistentStore: RocksDB, } } - override def retrieve[T: ClassTag](start: Int, end: Int): Seq[Option[T]] = { + override def retrieve[T: ClassTag](start: Long, end: Long): Seq[Option[T]] = { (start until end).map(x => retrieve(x)) } - override def retrieve[T: ClassTag](id: Int): Option[T] = { + override def retrieve[T: ClassTag](id: Long): Option[T] = { val bytes = persistentStore.get(id.toString.getBytes) if (bytes != null) { http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala index cdf629b..f61b067 100644 --- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala +++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.bahir.sql.streaming.akka import java.io.File +import java.util.Optional import scala.collection.JavaConverters._ import scala.collection.mutable @@ -27,7 +28,8 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext} import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp -import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.types.StructType import org.apache.bahir.utils.BahirUtils @@ -126,7 +128,8 @@ class BasicAkkaSourceSuite extends AkkaStreamSourceSuite { val parameters = Map("persistenceDirPath" -> persistenceDirPath) intercept[IllegalArgumentException] { - provider.createSource(sqlContext, "", None, "", parameters) + provider.createMicroBatchReader( + Optional.empty[StructType], "", new DataSourceOptions(parameters.asJava)) } } @@ -145,8 +148,10 @@ class BasicAkkaSourceSuite extends AkkaStreamSourceSuite { val parameters = Map("urlOfPublisher" -> akkaTestUtils.getFeederActorUri(), "persistenceDirPath" -> persistenceDirPath) - val offset: Long = provider.createSource(sqlContext, "", None, "", parameters) - .getOffset.get.asInstanceOf[LongOffset].offset + val offset: Long = provider.createMicroBatchReader( + Optional.empty[StructType], "", new DataSourceOptions(parameters.asJava)) + .asInstanceOf[AkkaMicroBatchReader] + .getCurrentOffset.offset assert(offset === 100L) } } http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala index 9cbfc32..f494c0d 100644 --- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala +++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaTestUtils.scala @@ -21,6 +21,8 @@ package org.apache.bahir.sql.streaming.akka import java.io.File import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ import scala.util.Random import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, Props} @@ -84,8 +86,7 @@ class AkkaTestUtils extends Logging { } def shutdown(): Unit = { -// actorSystem.awaitTermination() - actorSystem.shutdown() + Await.ready(actorSystem.terminate(), 5.seconds) } def setMessage(message: String): Unit = this.message = message http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala index 3a06da8..09df4ba 100644 --- a/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala +++ b/streaming-akka/examples/src/main/scala/org/apache/spark/examples/streaming/akka/ActorWordCount.scala @@ -15,10 +15,12 @@ * limitations under the License. */ -// scalastyle:off println +// scalastyle:off println awaitresult package org.apache.spark.examples.streaming.akka import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.util.Random import akka.actor.{Props, _} @@ -118,7 +120,7 @@ object FeederActor { println("Feeder started as:" + feeder) - actorSystem.awaitTermination() + Await.result(actorSystem.whenTerminated, Duration.Inf) } } @@ -182,4 +184,4 @@ object ActorWordCount { ssc.awaitTermination() } } -// scalastyle:on println +// scalastyle:on println awaitresult http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index d30e380..619bde2 100644 --- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.Future +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag @@ -302,7 +302,6 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag]( def onStop(): Unit = { actorSupervisor ! PoisonPill - actorSystem.shutdown() - actorSystem.awaitTermination() + Await.ready(actorSystem.terminate(), Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala index e52bf0e..f403afa 100644 --- a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala +++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ +import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor._ @@ -42,8 +43,7 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc = null } if (actorSystem != null) { - actorSystem.shutdown() - actorSystem.awaitTermination(30.seconds) + Await.ready(actorSystem.terminate(), 30.seconds) actorSystem = null } } http://git-wip-us.apache.org/repos/asf/bahir/blob/be1effaa/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala ---------------------------------------------------------------------- diff --git a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala index 535d0fc..00fd815 100644 --- a/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala +++ b/streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala @@ -15,9 +15,11 @@ * limitations under the License. */ -// scalastyle:off println +// scalastyle:off println awaitresult package org.apache.spark.examples.streaming.zeromq +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.language.implicitConversions import akka.actor.ActorSystem @@ -53,7 +55,7 @@ object SimpleZeroMQPublisher { Thread.sleep(1000) pubSocket ! ZMQMessage(ByteString(topic) :: messages) } - acs.awaitTermination() + Await.result(acs.whenTerminated, Duration.Inf) } } @@ -114,4 +116,4 @@ object ZeroMQWordCount { ssc.awaitTermination() } } -// scalastyle:on println +// scalastyle:on println awaitresult