http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala index 7046220..1a1d019 100644 --- a/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala +++ b/examples/streaming/wordcount-java/src/test/scala/io/gearpump/streaming/examples/wordcountjava/WordCountSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,19 @@ package io.gearpump.streaming.examples.wordcount +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + import io.gearpump.cluster.ClientToMaster.SubmitApplication import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.{MasterHarness, TestUtil} import io.gearpump.streaming.examples.wordcountjava.WordCount -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import scala.concurrent.Future -import scala.util.Success -class WordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { +class WordCountSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { before { startActorSystem() @@ -38,7 +40,7 @@ class WordCountSpec extends PropSpec with PropertyChecks with Matchers with Befo shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + protected override def config = TestUtil.DEFAULT_CONFIG property("WordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] @@ -47,7 +49,9 @@ class WordCountSpec extends PropSpec with PropertyChecks with Matchers with Befo val args = requiredArgs - Future {WordCount.main(masterConfig, args)} + Future { + WordCount.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0)))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala index e8a3136..387bc75 100644 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala +++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Split.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,18 +20,18 @@ package io.gearpump.streaming.examples.wordcount import java.util.concurrent.TimeUnit -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.{output, self} +class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.output - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { self ! Message("start") } - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { Split.TEXT_TO_SPLIT.lines.foreach { line => line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => output(new Message(msg, System.currentTimeMillis())) @@ -39,7 +39,8 @@ class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContex } import scala.concurrent.duration._ - taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! Message("continue", System.currentTimeMillis())) + taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! + Message("continue", System.currentTimeMillis())) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala index 10cc172..6560066 100644 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala +++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/Sum.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,47 +19,47 @@ package io.gearpump.streaming.examples.wordcount import java.util.concurrent.TimeUnit +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration import akka.actor.Cancellable -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} + import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import scala.collection.mutable -import scala.concurrent.duration.FiniteDuration - -class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() +class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() - private[wordcount] var wordCount : Long = 0 - private var snapShotTime : Long = System.currentTimeMillis() - private var snapShotWordCount : Long = 0 + private[wordcount] var wordCount: Long = 0 + private var snapShotTime: Long = System.currentTimeMillis() + private var snapShotWordCount: Long = 0 - private var scheduler : Cancellable = null + private var scheduler: Cancellable = null - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount) } - override def onNext(msg : Message) : Unit = { - if (null == msg) { - return + override def onNext(msg: Message): Unit = { + if (null != msg) { + val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L) + wordCount += 1 + map.put(msg.msg.asInstanceOf[String], current + 1) } - val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L) - wordCount += 1 - map.put(msg.msg.asInstanceOf[String], current + 1) } - override def onStop() : Unit = { + override def onStop(): Unit = { if (scheduler != null) { scheduler.cancel() } } - def reportWordCount() : Unit = { - val current : Long = System.currentTimeMillis() - LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)") + def reportWordCount(): Unit = { + val current: Long = System.currentTimeMillis() + LOG.info(s"Task ${taskContext.taskId} Throughput:" + + s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)") snapShotWordCount = wordCount snapShotTime = current } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala index 22f8ac6..f23604a 100644 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,18 @@ package io.gearpump.streaming.examples.wordcount -import io.gearpump.cluster.embedded.{EmbeddedCluster} -import io.gearpump.streaming.{StreamApplication, Processor} +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.embedded.EmbeddedCluster import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph.Node import io.gearpump.util.{AkkaApp, Graph, LogUtil} -import org.slf4j.Logger +/** Same WordCount with low level Processor Graph syntax */ object WordCount extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) val RUN_FOR_EVER = -1 @@ -36,10 +38,11 @@ object WordCount extends AkkaApp with ArgumentsParser { "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), - "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, defaultValue = Some(30)) - ) + "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, + defaultValue = Some(30)) + ) - def application(config: ParseResult) : StreamApplication = { + def application(config: ParseResult): StreamApplication = { val splitNum = config.getInt("split") val sumNum = config.getInt("sum") val split = Processor[Split](splitNum) @@ -58,7 +61,7 @@ object WordCount extends AkkaApp with ArgumentsParser { val localCluster = if (debugMode) { val cluster = new EmbeddedCluster(akkaConf: Config) - cluster.start + cluster.start() Some(cluster) } else { None @@ -73,11 +76,11 @@ object WordCount extends AkkaApp with ArgumentsParser { context.submit(app) if (debugMode) { - Thread.sleep(sleepSeconds * 1000) // sleep for 30 seconds for debugging. + Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. } context.close() - localCluster.map(_.stop) + localCluster.map(_.stop()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala index cc516db..ab8e8d0 100644 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,7 +24,8 @@ import io.gearpump.streaming.dsl.StreamApp import io.gearpump.streaming.dsl.StreamApp._ import io.gearpump.util.AkkaApp -object WordCount extends AkkaApp with ArgumentsParser{ +/** Same WordCount with High level DSL syntax */ +object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array.empty http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala index c44836c..21e498e 100644 --- a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,17 +17,19 @@ */ package io.gearpump.streaming.examples.wordcount +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.ActorSystem import akka.testkit.TestProbe -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} -import scala.language.postfixOps +import io.gearpump.Message +import io.gearpump.cluster.{TestUtil, UserConfig} +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime class SplitSpec extends WordSpec with Matchers { @@ -36,11 +38,11 @@ class SplitSpec extends WordSpec with Matchers { val taskContext = MockUtil.mockTaskContext - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + implicit val system: ActorSystem = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val mockTaskActor = TestProbe() - //mock self ActorRef + // Mock self ActorRef when(taskContext.self).thenReturn(mockTaskActor.ref) val conf = UserConfig.empty @@ -48,13 +50,13 @@ class SplitSpec extends WordSpec with Matchers { split.onStart(StartTime(0)) mockTaskActor.expectMsgType[Message] - val expectedWordCount = Split.TEXT_TO_SPLIT.split("""[\s\n]+""").filter(_.nonEmpty).length + val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length split.onNext(Message("next")) verify(taskContext, times(expectedWordCount)).output(anyObject()) - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala index 2303ce9..48a3fa9 100644 --- a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,20 +17,21 @@ */ package io.gearpump.streaming.examples.wordcount -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime + class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { val stringGenerator = Gen.alphaStr var wordcount = 0 - property("Sum should calculate the frequency of the word correctly"){ + property("Sum should calculate the frequency of the word correctly") { val taskContext = MockUtil.mockTaskContext http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala index 7fd8c84..63b2312 100644 --- a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,18 @@ package io.gearpump.streaming.examples.wordcount -import io.gearpump.cluster.ClientToMaster.{ShutdownApplication, SubmitApplication} -import io.gearpump.cluster.MasterToClient.{ShutdownApplicationResult, SubmitApplicationResult} -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.util.Util +import scala.concurrent.Future +import scala.util.Success + import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} -import scala.util.{Success, Try} -import scala.concurrent.Future +import io.gearpump.cluster.ClientToMaster.SubmitApplication +import io.gearpump.cluster.MasterToClient.SubmitApplicationResult +import io.gearpump.cluster.{MasterHarness, TestUtil} -class WordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { +class WordCountSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { before { startActorSystem() @@ -38,7 +39,7 @@ class WordCountSpec extends PropSpec with PropertyChecks with Matchers with Befo shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + protected override def config = TestUtil.DEFAULT_CONFIG property("WordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] @@ -57,7 +58,9 @@ class WordCountSpec extends PropSpec with PropertyChecks with Matchers with Befo val args = requiredArgs ++ optionalArgs - Future {WordCount.main(masterConfig, args)} + Future { + WordCount.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/README.md ---------------------------------------------------------------------- diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md index fe04554..7c9a316 100644 --- a/experiments/akkastream/README.md +++ b/experiments/akkastream/README.md @@ -1,2 +1,4 @@ Akka Stream -========= \ No newline at end of file +========= + +TODO: This directory is obsolte. Working on updating it to Akka 2.4.3. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala index 42023c7..d2b328d 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,7 +27,7 @@ import scala.concurrent.ExecutionContextExecutor * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph. * * @see [[ModuleGraph]] for the difference between RunnableGraph and - * [[ModuleGraph]] + * [[ModuleGraph]] * */ abstract class BaseMaterializer extends akka.stream.Materializer { @@ -36,7 +36,6 @@ abstract class BaseMaterializer extends akka.stream.Materializer { override implicit def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException() - def materialize[Mat](graph: ModuleGraph[Mat]): Mat override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { @@ -44,5 +43,5 @@ abstract class BaseMaterializer extends akka.stream.Materializer { materialize(graph) } - def shutdown: Unit + def shutdown(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala index bb23586..8851083 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,8 +24,8 @@ package akka.stream -import _root_.io.gearpump.util -import _root_.io.gearpump.util.Graph +import scala.collection.mutable + import akka.stream.Attributes.Attribute import akka.stream.ModuleGraph.Edge import akka.stream.gearpump.util.MaterializedValueOps @@ -33,7 +33,8 @@ import akka.stream.impl.StreamLayout._ import akka.stream.impl._ import akka.stream.{Graph => AkkaGraph} -import scala.collection.mutable +import _root_.io.gearpump.util +import _root_.io.gearpump.util.Graph /** * @@ -114,7 +115,7 @@ object ModuleGraph { def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] = { val topLevel = runnableGraph.module val factory = new ModuleGraphFactory(topLevel) - val (graph, mat) = factory.create() + val (graph, mat) = factory.create() new ModuleGraph(graph, mat) } @@ -208,7 +209,7 @@ object ModuleGraph { val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) - materializedValueSources.foreach{module => + materializedValueSources.foreach { module => val matAttribute = new MaterializedValueSourceAttribute(mat) val copied = copyAtomicModule(module, parentAttributes and Attributes(matAttribute)) assignPort(module.shape.outlet, (copied.shape.outlet, copied)) @@ -227,10 +228,10 @@ object ModuleGraph { } private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, MaterializedValueNode]): MaterializedValueNode = matNode match { - case Atomic(m) => materializedValues(m) + case Atomic(m) => materializedValues(m) case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) - case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues)) - case Ignore => Ignore + case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues)) + case Ignore => Ignore } final protected def assignPort(in: InPort, subscriber: (InPort, Module)): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala index 8f36bea..50c4450 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,7 +28,7 @@ object GearAttributes { * @param count * @return */ - def count(count: Int): Attributes = Attributes(ParallismAttribute(count)) + def count(count: Int): Attributes = Attributes(ParallismAttribute(count)) /** * Define we want to render this module locally. @@ -52,7 +52,7 @@ object GearAttributes { def location(attrs: Attributes): Location = { attrs.attributeList.foldLeft(Local: Location) { (s, attr) => attr match { - case LocationAttribute(location) => location + case LocationAttribute(location) => location case other => s } } @@ -66,7 +66,7 @@ object GearAttributes { def count(attrs: Attributes): Int = { attrs.attributeList.foldLeft(1) { (s, attr) => attr match { - case ParallismAttribute(count) => count + case ParallismAttribute(count) => count case other => s } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala index 2bdeb0f..a11d7cb 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,31 +20,32 @@ package akka.stream.gearpump import akka.actor.ActorSystem import akka.stream._ +import akka.stream.gearpump.graph.GraphCutter.Strategy import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer -import akka.stream.gearpump.graph.{RemoteGraph, SubGraphMaterializer, LocalGraph, GraphCutter} -import akka.stream.gearpump.graph.GraphCutter.Strategy +import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, SubGraphMaterializer} import akka.stream.impl.StreamLayout.Module /** * * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump * streaming application. If some module cannot be rendered remotely in Gearpump - * Cluster, then it will use local Actor materializer as fallback to materialize + * Cluster, then it uses local Actor materializer as fallback to materialize * the module locally. * * User can custom a [[Strategy]] to determinie which module should be rendered * remotely, and which module should be rendered locally. * * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two parts, - * and materialize them seperately. + * and materialize them seperately. * * @param system * @param strategy * @param useLocalCluster whether to use built-in in-process local cluster */ -class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter.AllRemoteStrategy, useLocalCluster: Boolean = true) - extends BaseMaterializer { +class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter.AllRemoteStrategy, + useLocalCluster: Boolean = true) + extends BaseMaterializer { private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map( classOf[LocalGraph] -> new LocalGraphMaterializer(system), @@ -53,18 +54,18 @@ class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = { val subGraphs = new GraphCutter(strategy).cut(graph) - val matValues = subGraphs.foldLeft(Map.empty[Module, Any]){(map, subGraph) => + val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, subGraph) => val materializer = subMaterializers(subGraph.getClass) map ++ materializer.materialize(subGraph, map) } graph.resolve(matValues) } - override def shutdown: Unit = { - subMaterializers.values.foreach(_.shutdown) + override def shutdown(): Unit = { + subMaterializers.values.foreach(_.shutdown()) } } -object GearpumpMaterializer{ - def apply(system: ActorSystem) = new GearpumpMaterializer(system) +object GearpumpMaterializer { + def apply(system: ActorSystem): GearpumpMaterializer = new GearpumpMaterializer(system) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala index d466595..7808b52 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,24 +18,26 @@ package akka.stream.gearpump.example +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.graph.GraphCutter import akka.stream.scaladsl.{Sink, Source} -import io.gearpump.cluster.ClusterConfig /** - * This tests how the [[GearpumpMaterializer]] materializes different partials of Graph - * to different runtime. - * - * In this test, source module and sink module will be materialized locally, - * Other transformation module will be materialized remotely in Gearpump - * streaming Application. - * - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - * - * - */ + * This tests how the [[GearpumpMaterializer]] materializes different partials of Graph + * to different runtime. + * + * In this test, source module and sink module are materialized locally, + * Other transformation module are materialized remotely in Gearpump + * streaming Application. + * + * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar + * + * + */ object Test { def main(args: Array[String]): Unit = { @@ -47,18 +49,21 @@ object Test { val echo = system.actorOf(Props(new Echo())) val sink = Sink.actorRef(echo, "COMPLETE") - val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - source.filter(_.startsWith("red")).fold("Items:"){(a, b) => + val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", + "blue sky")) + source.filter(_.startsWith("red")).fold("Items:") { (a, b) => a + "|" + b }.map("I want to order item: " + _).runWith(sink) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } class Echo extends Actor { def receive: Receive = { case any: AnyRef => + // scalastyle:off println println("Confirm received: " + any) + // scalastyle:on println } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala index 18d51a9..2426f5f 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +18,21 @@ package akka.stream.gearpump.example +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.stream.ActorMaterializer import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.scaladsl.{GearSink, GearSource} -import akka.stream.scaladsl.{Flow, FlowGraph, Sink, Source} -import io.gearpump.cluster.ClusterConfig +import akka.stream.scaladsl.{Flow, Sink, Source} /** - * - * This tests how different Materializers can be used together in an explicit way. - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - * - */ + * + * This tests how different Materializers can be used together in an explicit way. + * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar + * + */ object Test2 { def main(args: Array[String]): Unit = { @@ -52,13 +54,12 @@ object Test2 { val externalSink = Sink.actorRef(echo, "COMPLETE") val graph = FlowGraph.closed() { implicit b => - import FlowGraph.Implicits._ externalSource ~> Sink(entry) Source(exit) ~> externalSink } graph.run()(actorMaterializer) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } class Echo extends Actor { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala index 1d4afed..b1bf01c 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package akka.stream.gearpump.example +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.scaladsl.GearSource import akka.stream.scaladsl.Sink -import io.gearpump.cluster.ClusterConfig + import io.gearpump.streaming.dsl.CollectionDataSource /** - * read from remote and write to local - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - */ + * read from remote and write to local + * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar + */ object Test3 { def main(args: Array[String]): Unit = { @@ -44,7 +47,7 @@ object Test3 { val source = GearSource.from[String](sourceData) source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } class Echo extends Actor { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala index b97d329..b1e2fcb 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package akka.stream.gearpump.example +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.ActorSystem import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.scaladsl.GearSink import akka.stream.scaladsl.Source -import io.gearpump.cluster.ClusterConfig + import io.gearpump.streaming.dsl.LoggerSink /** - * read from local and write to remote - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - */ + * read from local and write to remote + * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar + */ object Test4 { def main(args: Array[String]): Unit = { @@ -42,6 +45,6 @@ object Test4 { val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala index a9da578..052c018 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,15 +18,17 @@ package akka.stream.gearpump.example +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.graph.GraphCutter -import akka.stream.scaladsl.{Unzip, Source, FlowGraph, Sink} -import io.gearpump.cluster.ClusterConfig +import akka.stream.scaladsl.{Sink, Source, Unzip} /** - test fanout - */ +test fanout + */ object Test5 { def main(args: Array[String]): Unit = { @@ -42,7 +44,6 @@ object Test5 { val source = Source(List(("male", "24"), ("female", "23"))) val graph = FlowGraph.closed() { implicit b => - import FlowGraph.Implicits._ val unzip = b.add(Unzip[String, String]()) val sink1 = Sink.actorRef(echo, "COMPLETE") @@ -55,7 +56,7 @@ object Test5 { graph.run() - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } class Echo extends Actor { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala index 8e9ccf6..38b40b7 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,20 @@ package akka.stream.gearpump.example +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{Actor, ActorSystem, Props} import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.{Reduce, GroupBy, GearSource} +import akka.stream.gearpump.scaladsl.GearSource import akka.stream.scaladsl.Sink -import io.gearpump.streaming.dsl.CollectionDataSource +import io.gearpump.streaming.dsl.CollectionDataSource /** - * WordCount example - * Test GroupBy - */ + * WordCount example + * Test GroupBy + */ import akka.stream.gearpump.scaladsl.Implicits._ @@ -45,14 +48,14 @@ object Test6 { val sink = Sink.actorRef(echo, "COMPLETE") val sourceData = new CollectionDataSource(List("this is a good start", "this is a good time", "time to start", "congratulations", "green plant", "blue sky")) val source = GearSource.from[String](sourceData) - source.mapConcat{line => + source.mapConcat { line => line.split(" ").toList - }.groupBy2(x=>x).map(word => (word, 1)) - .reduce({(a, b) => + }.groupBy2(x => x).map(word => (word, 1)) + .reduce({ (a, b) => (a._1, a._2 + b._2) }).log("word-count").runWith(sink) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } class Echo extends Actor { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala index 915624f..c2d0417 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,26 +20,26 @@ package akka.stream.gearpump.example import java.io.{File, FileInputStream} import java.util.zip.GZIPInputStream +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} import akka.actor.ActorSystem -import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer} import akka.stream.gearpump.graph.GraphCutter -import akka.stream.io.{Framing, InputStreamSource} +import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer} import akka.stream.scaladsl._ import akka.util.ByteString -import io.gearpump.cluster.main.{CLIOption, ArgumentsParser} -import io.gearpump.util.AkkaApp import org.json4s.JsonAST.JString -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import io.gearpump.util.AkkaApp /** * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ * which showcases running Akka Streams DSL across JVMs on Gearpump * * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar - * -input wikidata-${DATE}-all.json.gz -languages en,de + * -input wikidata-${DATE}-all.json.gz -languages en,de * * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/) * @@ -66,11 +66,10 @@ object WikipediaApp extends ArgumentsParser with AkkaApp { val g = FlowGraph.closed(count) { implicit b => sinkCount => { - import FlowGraph.Implicits._ val broadcast = b.add(Broadcast[WikidataElement](2)) elements ~> broadcast ~> logEveryNSink(1000) - broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount + broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount } } @@ -79,9 +78,9 @@ object WikipediaApp extends ArgumentsParser with AkkaApp { case Success((t, f)) => printResults(t, f) case Failure(tr) => println("Something went wrong") } - system.shutdown() + system.terminate() } - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } def source(file: File): Source[String, Future[Long]] = { @@ -124,21 +123,21 @@ object WikipediaApp extends ArgumentsParser with AkkaApp { .filter(_.sites.keySet == langs) .map { x => val titles = x.sites.values - titles.forall( _ == titles.head) + titles.forall(_ == titles.head) }.withAttributes(GearAttributes.remote) - def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0,0)) { - case ((t, f), true) => (t+1, f) - case ((t, f), false) => (t, f+1) + def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) { + case ((t, f), true) => (t + 1, f) + case ((t, f), false) => (t, f + 1) } def printResults(t: Int, f: Int) = { - val message = s""" - | Number of items with the same title: $t - | Number of items with the different title: $f - | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} + val message = + s""" + | Number of items with the same title: $t + | Number of items with the different title: $f + | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} """.stripMargin println(message) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala index 8d835a2..ef8c6fa 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,15 +23,16 @@ import akka.stream.ModuleGraph.Edge import akka.stream.gearpump.GearAttributes import akka.stream.gearpump.GearAttributes.{Local, Location, Remote} import akka.stream.gearpump.graph.GraphCutter.Strategy -import akka.stream.gearpump.module.{GroupByModule, BridgeModule, DummyModule, GearpumpTaskModule, SinkBridgeModule, SourceBridgeModule} -import akka.stream.impl.Stages.{TimerTransform, DirectProcessor} +import akka.stream.gearpump.module.{BridgeModule, DummyModule, GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule} +import akka.stream.impl.Stages.DirectProcessor import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module} -import akka.stream.impl.{MaterializedValueSource, SinkModule, SourceModule} +import akka.stream.impl.{SinkModule, SourceModule} + import io.gearpump.util.Graph /** * - * GraphCutter is used to decide which part will be rendered locally + * GraphCutter is used to decide which part is rendered locally * and which part should be rendered remotely. * * We will cut the graph based on the [[Strategy]] provided. @@ -64,11 +65,12 @@ class GraphCutter(strategy: Strategy) { doCut(graph, tags, moduleGraph.mat) } - private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location], mat: MaterializedValueNode): List[SubGraph] = { + private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location], + mat: MaterializedValueNode): List[SubGraph] = { val local = Graph.empty[Module, Edge] val remote = Graph.empty[Module, Edge] - graph.vertices.foreach{ module => + graph.vertices.foreach { module => if (tags(module) == Local) { local.addVertex(module) } else { @@ -76,7 +78,7 @@ class GraphCutter(strategy: Strategy) { } } - graph.edges.foreach{ nodeEdgeNode => + graph.edges.foreach { nodeEdgeNode => val (node1, edge, node2) = nodeEdgeNode (tags(node1), tags(node2)) match { case (Local, Local) => @@ -88,7 +90,7 @@ class GraphCutter(strategy: Strategy) { case bridge: BridgeModule[_, _, _] => local.addEdge(node1, edge, node2) case _ => - // create a bridge module in between + // Creates a bridge module in between val bridge = new SourceBridgeModule[AnyRef, AnyRef]() val remoteEdge = Edge(bridge.outPort, edge.to) remote.addEdge(bridge, remoteEdge, node2) @@ -100,7 +102,7 @@ class GraphCutter(strategy: Strategy) { case bridge: BridgeModule[_, _, _] => local.addEdge(node1, edge, node2) case _ => - // create a bridge module in between + // Creates a bridge module in between val bridge = new SinkBridgeModule[AnyRef, AnyRef]() val remoteEdge = Edge(edge.from, bridge.inPort) remote.addEdge(node1, remoteEdge, bridge) @@ -114,14 +116,14 @@ class GraphCutter(strategy: Strategy) { } private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = { - graph.vertices.map{vertex => + graph.vertices.map { vertex => vertex -> strategy.apply(vertex) }.toMap } private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = { val graph = inputGraph.copy - val dummies = graph.vertices.filter {module => + val dummies = graph.vertices.filter { module => module match { case dummy: DummyModule => true @@ -136,7 +138,6 @@ class GraphCutter(strategy: Strategy) { object GraphCutter { - type Strategy = PartialFunction[Module, Location] val BaseStrategy: Strategy = { @@ -145,8 +146,7 @@ object GraphCutter { case task: GearpumpTaskModule => Remote case groupBy: GroupByModule[_, _] => - //TODO: groupBy is not supported by local materializer - // yet + // TODO: groupBy is not supported by local materializer yet Remote case source: SourceModule[_, _] => Local @@ -157,7 +157,7 @@ object GraphCutter { case direct: DirectProcessor => Local case time: TimerTransform => - // render to local as it requires a timer. + // Renders to local as it requires a timer. Local } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala index b9533c4..473f32a 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,13 +26,14 @@ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl.{PublisherSource, SubscriberSink} import akka.stream.{Outlet, SinkShape, SourceShape} -import io.gearpump.util.Graph import org.reactivestreams.{Publisher, Subscriber} +import io.gearpump.util.Graph + /** * * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only - * contain module that can be materialized in local JVM. + * contain module that can be materialized in local JVM. * * @param graph */ @@ -46,7 +47,7 @@ object LocalGraph { */ class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer { - // create a local materializer + // Creates a local materializer val materializer = LocalMaterializer()(system) /** @@ -55,7 +56,7 @@ object LocalGraph { * @return Materialized Values for each Module after the materialization. */ override def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] = { - val newGraph: Graph[Module, Edge] = graph.graph.mapVertex{ module => + val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module => module match { case source: SourceBridgeModule[AnyRef, AnyRef] => val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]] @@ -72,7 +73,7 @@ object LocalGraph { materializer.materialize(newGraph, matValues) } - override def shutdown: Unit = { + override def shutdown(): Unit = { materializer.shutdown() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala index 250c354..5bb9a68 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,16 +25,16 @@ import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule} import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient import akka.stream.impl.StreamLayout.Module -import io.gearpump.cluster.ClusterConfig + import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.embedded.{EmbeddedCluster} -import io.gearpump.streaming.{StreamApplication, ProcessorId} +import io.gearpump.cluster.embedded.EmbeddedCluster +import io.gearpump.streaming.ProcessorId import io.gearpump.util.Graph /** * * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only - * contain modules that can be materialized in remote Gearpump cluster. + * contain modules that can be materialized in remote Gearpump cluster. * * @param graph */ @@ -50,7 +50,7 @@ object RemoteGraph { class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { private val local = if (useInProcessCluster) { val cluster = EmbeddedCluster() - cluster.start + cluster.start() Some(cluster) } else { None @@ -63,7 +63,7 @@ object RemoteGraph { override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, Any]): Map[Module, Any] = { val graph = subGraph.graph - + if (graph.isEmpty) { inputMatValues } else { @@ -98,9 +98,9 @@ object RemoteGraph { inputMatValues ++ resolve(matValues) } - override def shutdown: Unit = { + override def shutdown(): Unit = { context.close() - local.map(_.stop) + local.map(_.stop()) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala index 471370d..4364645 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,9 +18,9 @@ package akka.stream.gearpump.graph -import akka.actor.ActorSystem import akka.stream.ModuleGraph.Edge import akka.stream.impl.StreamLayout.Module + import io.gearpump.util.Graph /** @@ -40,11 +40,10 @@ trait SubGraph { def graph: Graph[Module, Edge] } - /** * Materializer for Sub-Graph type */ -trait SubGraphMaterializer{ +trait SubGraphMaterializer { /** * * @param matValues Materialized Values for each module before materialization @@ -53,5 +52,5 @@ trait SubGraphMaterializer{ def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] - def shutdown: Unit + def shutdown(): Unit } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala index 9c98924..78125c8 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,17 +19,17 @@ package akka.stream.gearpump.materializer import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import scala.concurrent.{Await, ExecutionContextExecutor} import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, PoisonPill, Props, RepointableActorRef} import akka.dispatch.Dispatchers import akka.pattern.ask import akka.stream.ModuleGraph.Edge import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{FlowNameCounter, StreamSupervisor} -import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph, Optimizations} -import io.gearpump.util.Graph +import akka.stream.impl.StreamSupervisor +import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph} -import scala.concurrent.{Await, ExecutionContextExecutor} +import io.gearpump.util.Graph /** * [[LocalMaterializer]] will use local actor to materialize the graph @@ -62,11 +62,11 @@ abstract class LocalMaterializer( import Attributes._ opAttr.attributeList.foldLeft(settings) { (s, attr) => attr match { - case InputBuffer(initial, max) => s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) + case InputBuffer(initial, max) => s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) - case l: LogLevels => s - case Name(_) => s + case l: LogLevels => s + case Name(_) => s case other => s } } @@ -82,22 +82,26 @@ abstract class LocalMaterializer( case ref: LocalActorRef => ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) case ref: RepointableActorRef => - if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false) - else { + if (ref.isStarted) { + ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), + name, systemService = false) + } else { implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef] + val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), + name)).mapTo[ActorRef] Await.result(f, timeout.duration) } case unknown => - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") + throw new IllegalStateException( + s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") } } - override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match { - case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId - case other => other - }) + override lazy val executionContext: ExecutionContextExecutor = + dispatchers.lookup(settings.dispatcher match { + case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId + case other => other + }) def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] @@ -109,28 +113,37 @@ abstract class LocalMaterializer( override def actorOf(context: MaterializationContext, props: Props): ActorRef = { val dispatcher = - if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher - else props.dispatcher + if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) { + effectiveSettings(context.effectiveAttributes).dispatcher + } else { + props.dispatcher + } actorOf(props, context.stageName, dispatcher) } } object LocalMaterializer { - def apply(materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None, optimizations: Optimizations = Optimizations.none)(implicit system: ActorSystem): LocalMaterializerImpl = { + def apply(materializerSettings: Option[ActorMaterializerSettings] = None, + namePrefix: Option[String] = None, + optimizations: Optimizations = Optimizations.none)(implicit system: ActorSystem) + : LocalMaterializerImpl = { val settings = materializerSettings getOrElse ActorMaterializerSettings(system) apply(settings, namePrefix.getOrElse("flow"), optimizations)(system) } - def apply(materializerSettings: ActorMaterializerSettings, namePrefix: String, optimizations: Optimizations)(implicit system: ActorSystem): LocalMaterializerImpl = { + def apply(materializerSettings: ActorMaterializerSettings, + namePrefix: String, optimizations: Optimizations)(implicit system: ActorSystem) + : LocalMaterializerImpl = { val haveShutDown = new AtomicBoolean(false) new LocalMaterializerImpl( system, materializerSettings, system.dispatchers, - system.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher)), + system.actorOf(StreamSupervisor.props(materializerSettings, + haveShutDown).withDispatcher(materializerSettings.dispatcher)), haveShutDown, FlowNameCounter(system).counter, namePrefix, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala index 2ccc13f..97d8f70 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,17 +27,14 @@ import akka.stream.actor.ActorSubscriber import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule import akka.stream.gearpump.module.ReduceModule import akka.stream.gearpump.util.MaterializedValueOps -import akka.stream.impl.GenJunctions.{UnzipWithModule, ZipWithModule} -import akka.stream.impl.Junctions.{BalanceModule, BroadcastModule, ConcatModule, FanInModule, FanOutModule, FlexiMergeModule, FlexiRouteModule, JunctionModule, MergeModule, MergePreferredModule} -import akka.stream.impl.Stages.{Fold, DirectProcessor, Identity, StageModule} +import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule} import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.io.SslTlsCipherActor -import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, Balance, Broadcast, Concat, ExposedPublisher, FairMerge, FanIn, FanOut, FlexiMerge, FlexiRoute, MaterializedValuePublisher, MaterializedValueSource, SinkModule, SourceModule, UnfairMerge, VirtualProcessor} -import akka.stream.io.SslTls.TlsModule -import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, Optimizations, OutPort, Shape} -import io.gearpump.util.Graph +import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor} +import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape} import org.reactivestreams.{Processor, Publisher, Subscriber} +import io.gearpump.util.Graph + /** * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] * @@ -61,14 +58,14 @@ class LocalMaterializerImpl ( optimizations: Optimizations) extends LocalMaterializer( system, settings, dispatchers, supervisor, - haveShutDown, flowNameCounter, namePrefix, optimizations){ + haveShutDown, flowNameCounter, namePrefix, optimizations) { override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = { - val materializedGraph = graph.mapVertex{module => + val materializedGraph = graph.mapVertex { module => materializeAtomic(module) } - materializedGraph.edges.foreach{nodeEdgeNode => + materializedGraph.edges.foreach { nodeEdgeNode => val (node1, edge, node2) = nodeEdgeNode val from = edge.from val to = edge.to @@ -77,7 +74,7 @@ class LocalMaterializerImpl ( publisher.subscribe(subscriber) } - val matValues = inputMatValues ++ materializedGraph.vertices.map{vertex => + val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex => (vertex.module, vertex.matValue) }.toMap @@ -139,7 +136,7 @@ class LocalMaterializerImpl ( MaterializedModule(source, mat, outputs = outputs) case reduce: ReduceModule[Any] => - //TODO: remove this after the official akka-stream API support the Reduce Module + //TODO: remove this after the official akka-stream API support the Reduce Module val stage = LocalMaterializerImpl.toFoldModule(reduce) val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor) @@ -177,88 +174,90 @@ class LocalMaterializerImpl ( } private def processorFor(op: StageModule, - effectiveAttributes: Attributes, - effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match { + effectiveAttributes: Attributes, + effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match { case DirectProcessor(processorFactory, _) => processorFactory() - case Identity(attr) => (new VirtualProcessor, ()) + case Identity(attr) => (new VirtualProcessor, ()) case _ => val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes) ActorProcessorFactory[Any, Any]( actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat } -private def materializeJunction( + private def materializeJunction( op: JunctionModule, effectiveAttributes: Attributes, effectiveSettings: ActorMaterializerSettings): MaterializedModule = { - op match { - case fanin: FanInModule => - val (props, inputs, output) = fanin match { + op match { + case fanin: FanInModule => + val (props, inputs, output) = fanin match { - case MergeModule(shape, _) => - (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out) + case MergeModule(shape, _) => + (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out) - case f: FlexiMergeModule[_, Shape] => - val flexi = f.flexi(f.shape) - val shape: Shape = f.shape - (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head) + case f: FlexiMergeModule[_, Shape] => + val flexi = f.flexi(f.shape) + val shape: Shape = f.shape + (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head) - case MergePreferredModule(shape, _) => - (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out) + case MergePreferredModule(shape, _) => + (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out) - case ConcatModule(shape, _) => - require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO - (Concat.props(effectiveSettings), shape.inSeq, shape.out) + case ConcatModule(shape, _) => + require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO + (Concat.props(effectiveSettings), shape.inSeq, shape.out) - case zip: ZipWithModule => - (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) - } + case zip: ZipWithModule => + (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) + } - val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) - val publisher = new ActorPublisher[Any](impl) - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl ! ExposedPublisher(publisher) + val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) + val publisher = new ActorPublisher[Any](impl) + // Resolve cyclic dependency with actor. This MUST be the first message no matter what. + impl ! ExposedPublisher(publisher) - val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map{ pair => - val (in, id) = pair - (in, FanIn.SubInput[Any](impl, id)) - }.toMap + val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair => + val (in, id) = pair + (in, FanIn.SubInput[Any](impl, id)) + }.toMap - val outMapping = Map(output -> publisher) - MaterializedModule(fanin, (), inputMapping, outMapping) + val outMapping = Map(output -> publisher) + MaterializedModule(fanin, (), inputMapping, outMapping) - case fanout: FanOutModule => - val (props, in, outs) = fanout match { + case fanout: FanOutModule => + val (props, in, outs) = fanout match { - case r: FlexiRouteModule[t, Shape] => - val flexi = r.flexi(r.shape) - val shape: Shape = r.shape - (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets) + case r: FlexiRouteModule[t, Shape] => + val flexi = r.flexi(r.shape) + val shape: Shape = r.shape + (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets) - case BroadcastModule(shape, eagerCancel, _) => - (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) + case BroadcastModule(shape, eagerCancel, _) => + (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) - case BalanceModule(shape, waitForDownstreams, _) => - (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) + case BalanceModule(shape, waitForDownstreams, _) => + (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) - case unzip: UnzipWithModule => - (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) - } - val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) - val size = outs.size - def factory(id: Int) = - new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) } - val publishers = - if (outs.size < 8) Vector.tabulate(size)(factory) - else List.tabulate(size)(factory) - - impl ! FanOut.ExposedPublishers(publishers) - val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) => - (out, pub) - }.toMap - - val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl)) - MaterializedModule(fanout, (), inputs, outputs) + case unzip: UnzipWithModule => + (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) + } + val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) + val size = outs.size + def factory(id: Int) = + new ActorPublisher[Any](impl) { + override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) + } + val publishers = + if (outs.size < 8) Vector.tabulate(size)(factory) + else List.tabulate(size)(factory) + + impl ! FanOut.ExposedPublishers(publishers) + val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) => + (out, pub) + }.toMap + + val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl)) + MaterializedModule(fanout, (), inputs, outputs) } } @@ -269,11 +268,11 @@ private def materializeJunction( } object LocalMaterializerImpl { - case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]] , outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]]) + case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]]) def toFoldModule(reduce: ReduceModule[Any]): Fold = { val f = reduce.f - val aggregator = {(zero: Any, input: Any) => + val aggregator = { (zero: Any, input: Any) => if (zero == null) { input } else {
