Repository: incubator-gearpump Updated Branches: refs/heads/akka-streams bc3940352 -> f1bec6709
update akkastream against latests Graph DSL Author: manuzhang <[email protected]> Closes #97 from manuzhang/akka-streams-new. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f1bec670 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f1bec670 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f1bec670 Branch: refs/heads/akka-streams Commit: f1bec67098b80f9bffe719f885f8d089344a5579 Parents: bc39403 Author: manuzhang <[email protected]> Authored: Wed Oct 12 14:30:49 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Oct 12 14:30:49 2016 +0800 ---------------------------------------------------------------------- .../src/main/resources/geardefault.conf | 2 +- .../akkastream/GearpumpMaterializer.scala | 64 ++++++++++---------- .../gearpump/akkastream/example/Test.scala | 3 +- .../akkastream/example/WikipediaApp.scala | 12 ++-- .../materializer/RemoteMaterializerImpl.scala | 40 ++++++------ .../gearpump/akkastream/scaladsl/Api.scala | 2 +- .../gearpump/akkastream/task/Unzip2Task.scala | 2 +- project/Build.scala | 18 +++--- project/Pack.scala | 16 ++--- .../gearpump/streaming/StreamApplication.scala | 2 +- .../apache/gearpump/streaming/dsl/plan/OP.scala | 19 ++++-- 11 files changed, 92 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf index e9da531..56524d4 100644 --- a/experiments/akkastream/src/main/resources/geardefault.conf +++ b/experiments/akkastream/src/main/resources/geardefault.conf @@ -4,5 +4,5 @@ gearpump.serializers { "scala.collection.immutable.Map$Map2" = "" } akka { - version = "2.4.10" + version = "2.4.11" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala index 75dc95a..9ff701c 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala @@ -29,14 +29,13 @@ import akka.stream.impl.Stages.SymbolicGraphStage import akka.stream.impl.StreamLayout._ import akka.stream.impl._ import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule} -import akka.stream.scaladsl.ModuleExtractor import akka.stream.stage.GraphStage import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer import org.apache.gearpump.akkastream.graph._ -import org.apache.gearpump.akkastream.util.MaterializedValueOps +import org.apache.gearpump.util.{Graph => GGraph} import scala.collection.mutable import scala.concurrent.{ExecutionContextExecutor, Promise} @@ -137,7 +136,7 @@ class GearpumpMaterializer(override val system: ActorSystem, override def logger: LoggingAdapter = Logging.getLogger(system, this) - override def isShutdown: Boolean = system.isTerminated + override def isShutdown: Boolean = system.whenTerminated.isCompleted override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { import ActorAttributes._ @@ -177,7 +176,6 @@ class GearpumpMaterializer(override val system: ActorSystem, Nil) val info = Fusing.aggressive(runnableGraph).module.info - import _root_.org.apache.gearpump.util.{Graph => GGraph} val graph = GGraph.empty[Module, Edge] info.allModules.foreach(module => { @@ -204,33 +202,7 @@ class GearpumpMaterializer(override val system: ActorSystem, }) if(Debug) { - val iterator = graph.topologicalOrderIterator - while (iterator.hasNext) { - val module = iterator.next() - // scalastyle:off println - module match { - case graphStageModule: GraphStageModule => - graphStageModule.stage match { - case symbolicGraphStage: SymbolicGraphStage[_, _, _] => - val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName - println( - s"${module.getClass.getSimpleName}(${symbolicName})" - ) - case graphStage: GraphStage[_] => - val name = graphStage.getClass.getSimpleName - println( - s"${module.getClass.getSimpleName}(${name})" - ) - case other => - println( - s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})" - ) - } - case _ => - println(module.getClass.getSimpleName) - } - // scalastyle:on println - } + printGraph(graph) } val subGraphs = GraphPartitioner(strategy).partition(graph) @@ -266,6 +238,36 @@ class GearpumpMaterializer(override val system: ActorSystem, rt.getOrElse(null).asInstanceOf[Mat] } + private def printGraph(graph: GGraph[Module, Edge]): Unit = { + val iterator = graph.topologicalOrderIterator + while (iterator.hasNext) { + val module = iterator.next() + // scalastyle:off println + module match { + case graphStageModule: GraphStageModule => + graphStageModule.stage match { + case symbolicGraphStage: SymbolicGraphStage[_, _, _] => + val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName + println( + s"${module.getClass.getSimpleName}(${symbolicName})" + ) + case graphStage: GraphStage[_] => + val name = graphStage.getClass.getSimpleName + println( + s"${module.getClass.getSimpleName}(${name})" + ) + case other => + println( + s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})" + ) + } + case _ => + println(module.getClass.getSimpleName) + } + // scalastyle:on println + } + } + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat], subflowFuser: GraphInterpreterShell => ActorRef): Mat = { materialize(runnableGraph) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala index 2ce4e19..40cd556 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala @@ -21,6 +21,7 @@ package org.apache.gearpump.akkastream.example import akka.actor.{Actor, ActorSystem, Props} import akka.stream.scaladsl.{Sink, Source} import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.graph.GraphPartitioner import org.apache.gearpump.cluster.main.ArgumentsParser import org.apache.gearpump.util.AkkaApp @@ -37,7 +38,7 @@ object Test extends AkkaApp with ArgumentsParser { // scalastyle:off println override def main(akkaConf: Config, args: Array[String]): Unit = { implicit val system = ActorSystem("Test", akkaConf) - implicit val materializer = GearpumpMaterializer() + implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy) val echo = system.actorOf(Props(new Echo())) val sink = Sink.actorRef(echo, "COMPLETE") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala index 7e2211d..830f278 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala @@ -81,13 +81,11 @@ object WikipediaApp extends ArgumentsParser with AkkaApp { } ) - g.run().onComplete { x => - x match { - case Success((t, f)) => printResults(t, f) - // scalastyle:off println - case Failure(tr) => println("Something went wrong") - // scalastyle:on println - } + g.run().onComplete { + case Success((t, f)) => printResults(t, f) + // scalastyle:off println + case Failure(tr) => println("Something went wrong") + // scalastyle:on println } Await.result(system.whenTerminated, 60.minutes) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala index f3f8094..936ac29 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -28,14 +28,16 @@ import akka.stream.impl.{HeadOptionStage, Stages, Throttle} import akka.stream.scaladsl._ import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue import akka.stream.stage.GraphStage -import akka.stream.{FanInShape, FanOutShape} import org.apache.gearpump.akkastream.GearAttributes import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge import org.apache.gearpump.akkastream.module._ import org.apache.gearpump.akkastream.task._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.StreamApp -import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} import org.apache.gearpump.util.Graph import org.slf4j.LoggerFactory @@ -96,14 +98,14 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { vertex.shape.inlets.flatMap { inlet => graph.incomingEdgesOf(vertex).find( _._2.to == inlet).map(_._1 - ).flatMap(processorIds.get(_)) + ).flatMap(processorIds.get) }.toList } def outProcessors(vertex: Module): List[ProcessorId] = { vertex.shape.outlets.flatMap { outlet => graph.outgoingEdgesOf(vertex).find( _._2.from == outlet).map(_._3 - ).flatMap(processorIds.get(_)) + ).flatMap(processorIds.get) }.toList } processorIds.get(vertex).map(processorId => { @@ -165,6 +167,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { reduceOp(reduce.f, conf) case graphStage: GraphStageModule => translateGraphStageWithMaterializedValue(graphStage, parallelism, conf) + case _ => + null } if (op == null) { throw new UnsupportedOperationException( @@ -174,12 +178,11 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { op }.mapEdge[OpEdge] { (n1, edge, n2) => n2 match { - case master: MasterOp => - Shuffle - case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] => - Shuffle - case slave: SlaveOp[_] => + case chainableOp: ChainableOp[_, _] + if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] => Direct + case _ => + Shuffle } } (opGraph, matValues.toMap) @@ -237,7 +240,8 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { withValue(FoldTask.AGGREGATOR, fold.f) ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold") case groupBy: GroupBy[_, _] => - GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf) + GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating), + groupBy.maxSubstreams, "groupBy", conf) case groupedWithin: GroupedWithin[_] => val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d). withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n) @@ -318,11 +322,11 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { // TODO null case unzip: Unzip[_, _] => - ProcessorOp(classOf[Unzip2Task[_, _, _]], - parallelism, - conf.withValue( - Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper) - ), "unzip") +// ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism, +// conf.withValue( +// Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip") + // TODO + null case zip: Zip[_, _] => zipWithOp(zip.zipper, conf) case zipWith2: ZipWith2[_, _, _] => @@ -474,10 +478,10 @@ object RemoteMaterializerImpl { def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String, conf: UserConfig): Op = { - FlatMapOp(fun, description, conf) + ChainableOp(new FlatMapFunction[In, Out](fun, description), conf) } - def conflatOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out, + def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out, conf: UserConfig): Op = { var agg = None: Option[Out] val flatMap = {elem: In => @@ -489,7 +493,7 @@ object RemoteMaterializerImpl { } List(agg.get) } - flatMapOp (flatMap, "conflat", conf) + flatMapOp (flatMap, "conflate", conf) } def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala index 85b1d5e..80619ef 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala @@ -56,7 +56,7 @@ object GearSource{ * */ def from[OUT](source: DataSource): Source[OUT, Unit] = { - val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty)) + val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty)) taskSource } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala index 99f1b55..7dd91fc 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala @@ -40,7 +40,7 @@ class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig) } object Unzip2Task { - case class UnZipFunction[In, A1, A2](val unzip: In => (A1, A2)) extends Serializable + case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends Serializable val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index f1e0443..a1e6ca5 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -35,8 +35,7 @@ object Build extends sbt.Build { val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code") - val akkaVersion = "2.4.10" - val akkaStreamVersion = "2.4-SNAPSHOT" + val akkaVersion = "2.4.11" val apacheRepo = "https://repository.apache.org/" val hadoopVersion = "2.6.0" val hbaseVersion = "1.0.0" @@ -148,10 +147,8 @@ object Build extends sbt.Build { "commons-logging" % "commons-logging" % commonsLoggingVersion, "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion exclude("com.typesafe.akka", "akka-stream_2.11"), - "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion, "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided" - ), - dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion + ) ) val coreDependencies = Seq( @@ -353,7 +350,6 @@ object Build extends sbt.Build { "com.github.scribejava" % "scribejava-apis" % "2.4.0", "com.ning" % "async-http-client" % "1.9.33", "org.webjars" % "angularjs" % "1.4.9", - "org.apache.hadoop" % "hadoop-common" % hadoopVersion, // angular 1.5 breaks ui-select, but we need ng-touch 1.5 "org.webjars.npm" % "angular-touch" % "1.5.0", @@ -417,12 +413,12 @@ object Build extends sbt.Build { settings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion, + "com.typesafe.akka" %% "akka-stream" % akkaVersion, + "org.apache.hadoop" % "hadoop-common" % hadoopVersion, "org.json4s" %% "json4s-jackson" % "3.2.11", "org.scalatest" %% "scalatest" % scalaTestVersion % "test" - ), - dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion - )) + ) + )) .dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile") .disablePlugins(sbtassembly.AssemblyPlugin) @@ -436,7 +432,7 @@ object Build extends sbt.Build { ), mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test") )) - .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") lazy val storm = Project( id = "gearpump-experiments-storm", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 47d3064..1c87653 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -69,8 +69,7 @@ object Pack extends sbt.Build { "worker" -> "org.apache.gearpump.cluster.main.Worker", "services" -> "org.apache.gearpump.services.main.Services", "yarnclient" -> "org.apache.gearpump.experiments.yarn.client.Client", - "storm" -> "org.apache.gearpump.experiments.storm.StormRunner", - "akkastream" -> "org.apache.gearpump.akkastream.example.Test11" + "storm" -> "org.apache.gearpump.experiments.storm.StormRunner" ), packJvmOpts := Map( "gear" -> Seq("-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}"), @@ -110,13 +109,7 @@ object Pack extends sbt.Build { "storm" -> Seq( "-server", "-Djava.net.preferIPv4Stack=true", - "-Dgearpump.home=${PROG_HOME}"), - - "akkastream" -> Seq( - "-server", - "-Djava.net.preferIPv4Stack=true", - "-Dgearpump.home=${PROG_HOME}", - "-Djava.rmi.server.hostname=localhost") + "-Dgearpump.home=${PROG_HOME}") ), packLibDir := Map( "lib" -> new ProjectsToPack(core.id, streaming.id), @@ -148,14 +141,13 @@ object Pack extends sbt.Build { "worker" -> daemonClassPath, "services" -> serviceClassPath, "yarnclient" -> yarnClassPath, - "storm" -> stormClassPath, - "akkstream" -> daemonClassPath + "storm" -> stormClassPath ), packArchivePrefix := projectName + "-" + scalaBinaryVersion.value, packArchiveExcludes := Seq("integrationtest") ) - ).dependsOn(core, streaming, services, yarn, storm, akkastream). + ).dependsOn(core, streaming, services, yarn, storm). disablePlugins(sbtassembly.AssemblyPlugin) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index a6588a1..66ec873 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -123,7 +123,7 @@ object LifeTime { */ class StreamApplication( override val name: String, val inputUserConfig: UserConfig, - dag: Graph[ProcessorDescription, PartitionerDescription]) + val dag: Graph[ProcessorDescription, PartitionerDescription]) extends Application { require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f1bec670/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 744976b..b2c5506 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -25,7 +25,8 @@ import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.TransformTask -import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.dsl.window.api.{CountWindow, GroupByFn} +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} import org.apache.gearpump.streaming.task.Task @@ -124,11 +125,11 @@ case class DataSinkOp( * to another Op to be used */ case class ChainableOp[IN, OUT]( - fn: SingleInputFunction[IN, OUT]) extends Op { + fn: SingleInputFunction[IN, OUT], + userConfig: UserConfig = UserConfig.empty) extends Op { override def description: String = fn.description - override def userConfig: UserConfig = UserConfig.empty override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { @@ -141,7 +142,17 @@ case class ChainableOp[IN, OUT]( } override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor") + Processor[TransformTask[Any, Any]](1, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn)) + } +} + +object GroupByOp { + + def apply[IN, GROUP](groupBy: IN => GROUP, parallelism: Int, + description: String, userConfig: UserConfig): Op = { + GroupByOp(GroupAlsoByWindow(groupBy, CountWindow.apply(1).accumulating), parallelism, + description, userConfig) } }
