http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala index 4bec092..9852ed0 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,19 +21,18 @@ package akka.stream.gearpump.materializer import akka.actor.ActorSystem import akka.stream.ModuleGraph.Edge import akka.stream.gearpump.GearAttributes -import akka.stream.gearpump.module.{ProcessorModule, ReduceModule, GroupByModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} -import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, UnZip2Task, SinkBridgeTask, SourceBridgeTask} -import akka.stream.impl.GenJunctions.{UnzipWith2Module, ZipWithModule} -import akka.stream.impl.Junctions._ -import akka.stream.impl.{FlexiRouteImpl, Stages} -import akka.stream.impl.Stages.{MaterializingStageFactory, StageModule} +import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} +import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task} +import akka.stream.impl.Stages +import akka.stream.impl.Stages.StageModule import akka.stream.impl.StreamLayout.Module +import org.slf4j.LoggerFactory + import io.gearpump.cluster.UserConfig import io.gearpump.streaming.dsl.StreamApp -import io.gearpump.streaming.dsl.op.{GroupByOp, DataSinkOp, DataSourceOp, Direct, FlatMapOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp} +import io.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp} import io.gearpump.streaming.{ProcessorId, StreamApplication} import io.gearpump.util.Graph -import org.slf4j.LoggerFactory /** * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump @@ -118,7 +117,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { } private def cleanClues(app: StreamApplication): StreamApplication = { - val graph = app.dag.mapVertex{ processor => + val graph = app.dag.mapVertex { processor => val conf = cleanClue(processor.taskConf) processor.copy(taskConf = conf) } @@ -126,7 +125,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { } private def cleanClue(conf: UserConfig): UserConfig = { - conf.filter{kv => + conf.filter { kv => kv._2 != RemoteMaterializerImpl.STAINS } } @@ -181,7 +180,6 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { (opGraph, matValues) } - private def translateStage(module: StageModule, conf: UserConfig): Op = { module match { case buffer: Stages.Buffer => @@ -252,10 +250,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { } private def translateFanIn( - fanIn: FanInModule, - edges: List[(Module, Edge, Module)], - parallelism: Int, - conf: UserConfig): Op = { + fanIn: FanInModule, + edges: List[(Module, Edge, Module)], + parallelism: Int, + conf: UserConfig): Op = { fanIn match { case merge: MergeModule[_] => MergeOp("merge", conf) @@ -275,10 +273,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { } private def translateFanOut( - fanOut: FanOutModule, - edges: List[(Module, Edge, Module)], - parallelism: Int, - conf: UserConfig): Op = { + fanOut: FanOutModule, + edges: List[(Module, Edge, Module)], + parallelism: Int, + conf: UserConfig): Op = { fanOut match { case unzip2: UnzipWith2Module[Any, Any, Any] => val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f)) @@ -332,7 +330,7 @@ object RemoteMaterializerImpl { } def mapOp(map: Any => Any, conf: UserConfig): Op = { - flatMapOp ({ data: Any => + flatMapOp({ data: Any => List(map(data)) }, "map", conf) } @@ -346,8 +344,8 @@ object RemoteMaterializerImpl { } def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = { - var agg : Any = null - val flatMap = {elem: Any => + var agg: Any = null + val flatMap = { elem: Any => agg = if (agg == null) { seed(elem) } else { @@ -356,7 +354,7 @@ object RemoteMaterializerImpl { List(agg) } - flatMapOp (flatMap, "map", conf) + flatMapOp(flatMap, "map", conf) } def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = { @@ -376,7 +374,7 @@ object RemoteMaterializerImpl { b } - val flatMap: Any=>Iterable[Any] = {input: Any => + val flatMap: Any => Iterable[Any] = { input: Any => buf += input left -= 1 if (left == 0) { @@ -393,7 +391,7 @@ object RemoteMaterializerImpl { def dropOp(number: Long, conf: UserConfig): Op = { var left = number - val flatMap: Any=>Iterable[Any] = {input: Any => + val flatMap: Any => Iterable[Any] = { input: Any => if (left > 0) { left -= 1 None @@ -404,14 +402,14 @@ object RemoteMaterializerImpl { flatMapOp(flatMap, "drop", conf) } - def dropWhileOp(drop: Any=>Boolean, conf: UserConfig): Op = { + def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = { flatMapOp({ data => - if (drop(data)) None else Option(data) + if (drop(data)) None else Option(data) }, "dropWhile", conf) } - def logOp(name: String, extract: Any=>Any, conf: UserConfig): Op = { - val flatMap = {elem: Any => + def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = { + val flatMap = { elem: Any => LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}") List(elem) } @@ -422,7 +420,7 @@ object RemoteMaterializerImpl { var aggregator = zero var pushedZero = false - val flatMap = {elem: Any => + val flatMap = { elem: Any => aggregator = f(aggregator, elem) if (pushedZero) { @@ -438,7 +436,7 @@ object RemoteMaterializerImpl { def takeOp(count: Long, conf: UserConfig): Op = { var left: Long = count - val filter: Any=>Iterable[Any] = {elem: Any => + val filter: Any => Iterable[Any] = { elem: Any => left -= 1 if (left > 0) Some(elem) else if (left == 0) Some(elem)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala index 8a154c3..c5dfc9a 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,10 +26,10 @@ import org.reactivestreams.{Publisher, Subscriber} /** * * - * [[IN]] -> [[BridgeModule]] -> [[OUT]] - * / - * / - * out of band data input or output channel [[MAT]] + * [[IN]] -> [[BridgeModule]] -> [[OUT]] + * / + * / + * out of band data input or output channel [[MAT]] * * * [[BridgeModule]] is used as a bridge between different materializers. @@ -38,18 +38,18 @@ import org.reactivestreams.{Publisher, Subscriber} * * For example: * - * Remote Materializer - * ----------------------------- - * | | - * | BridgeModule -> RemoteSink | - * | / | - * --/---------------------------- - * Local Materializer / out of band channel. - * ----------------------/---- - * | Local / | - * | Source -> BridgeModule | - * | | - * --------------------------- + * Remote Materializer + * ----------------------------- + * | | + * | BridgeModule -> RemoteSink | + * | / | + * --/---------------------------- + * Local Materializer / out of band channel. + * ----------------------/---- + * | Local / | + * | Source -> BridgeModule | + * | | + * --------------------------- * * * Typically [[BridgeModule]] is created implicitly as a temporary intermediate @@ -64,7 +64,7 @@ import org.reactivestreams.{Publisher, Subscriber} * @tparam OUT * @tparam MAT */ -abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT]{ +abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] { def attributes: Attributes def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT] @@ -72,17 +72,16 @@ abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT]{ override def carbonCopy: Module = newInstance } - /** * * Bridge module which accept out of band channel Input * [[org.reactivestreams.Publisher]][IN]. * * - * [[SourceBridgeModule]] -> [[OUT]] - * /| - * / - * out of band data input [[org.reactivestreams.Publisher]][IN] + * [[SourceBridgeModule]] -> [[OUT]] + * /| + * / + * out of band data input [[org.reactivestreams.Publisher]][IN] * * @see [[BridgeModule]] * @@ -94,7 +93,7 @@ class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name(" override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = new SourceBridgeModule[IN, OUT](attributes) override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = { - new SourceBridgeModule( attributes) + new SourceBridgeModule(attributes) } } @@ -104,11 +103,11 @@ class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name(" * [[org.reactivestreams.Subscriber]][OUT]. * * - * [[IN]] -> [[BridgeModule]] - * \ - * \ - * \| - * out of band data output [[org.reactivestreams.Subscriber]][OUT] + * [[IN]] -> [[BridgeModule]] + * \ + * \ + * \| + * out of band data output [[org.reactivestreams.Subscriber]][OUT] * * @see [[BridgeModule]] * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala index fcda327..bc744f9 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -49,7 +49,6 @@ import org.reactivestreams.{Publisher, Subscriber} */ trait DummyModule extends Module - /** * * [[DummySource]]-> [[BridgeModule]] -> Sink @@ -77,7 +76,6 @@ class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out]) } } - /** * * Source-> [[BridgeModule]] -> [[DummySink]] @@ -88,7 +86,7 @@ class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out]) * * @param attributes * @param shape - */ + */ class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN]) extends SinkModule[IN, Unit](shape) with DummyModule { override def create(context: MaterializationContext): (Subscriber[IN], Unit) = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala index d87d689..4b7d3ac 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ package akka.stream.gearpump.module import akka.stream.impl.FlowModule import akka.stream.impl.StreamLayout.Module import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape} + import io.gearpump.cluster.UserConfig import io.gearpump.streaming.sink.DataSink import io.gearpump.streaming.source.DataSource @@ -31,7 +32,7 @@ import io.gearpump.streaming.task.Task * * This is specially designed for Gearpump runtime. It is not supposed to be used * for local materializer. - * + * */ trait GearpumpTaskModule extends Module @@ -44,15 +45,19 @@ trait GearpumpTaskModule extends Module * @tparam T */ final case class SourceTaskModule[T]( - source: DataSource, - conf: UserConfig, - shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")), - attributes: Attributes = Attributes.name("SourceTaskModule")) + source: DataSource, + conf: UserConfig, + shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")), + attributes: Attributes = Attributes.name("SourceTaskModule")) extends GearpumpTaskModule { override def subModules: Set[Module] = Set.empty - override def withAttributes(attr: Attributes): Module = this.copy(shape = amendShape(attr), attributes = attr) - override def carbonCopy: Module = this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out"))) + override def withAttributes(attr: Attributes): Module = { + this.copy(shape = amendShape(attr), attributes = attr) + } + override def carbonCopy: Module = { + this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out"))) + } override def replaceShape(s: Shape): Module = if (s == shape) this @@ -83,7 +88,9 @@ final case class SinkTaskModule[IN]( extends GearpumpTaskModule { override def subModules: Set[Module] = Set.empty - override def withAttributes(attr: Attributes): Module = this.copy(shape = amendShape(attr), attributes = attr) + override def withAttributes(attr: Attributes): Module = { + this.copy(shape = amendShape(attr), attributes = attr) + } override def carbonCopy: Module = this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in"))) override def replaceShape(s: Shape): Module = @@ -116,9 +123,11 @@ case class ProcessorModule[IN, OUT, Unit]( override def carbonCopy: Module = newInstance - protected def newInstance: ProcessorModule[IN,OUT, Unit] = new ProcessorModule[IN,OUT, Unit](processor, conf, attributes) + protected def newInstance: ProcessorModule[IN, OUT, Unit] = { + new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) + } - override def withAttributes(attributes: Attributes): ProcessorModule[IN,OUT, Unit] = { - new ProcessorModule[IN,OUT, Unit](processor, conf, attributes) + override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = { + new ProcessorModule[IN, OUT, Unit](processor, conf, attributes) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala index 6871e71..e57a6f6 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,7 +22,6 @@ import akka.stream.Attributes import akka.stream.impl.FlowModule import akka.stream.impl.StreamLayout.Module - /** * * Group the T value groupBy function @@ -32,11 +31,14 @@ import akka.stream.impl.StreamLayout.Module * @tparam T * @tparam Group */ -case class GroupByModule[T, Group](val groupBy: T => Group, val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit]{ +case class GroupByModule[T, Group](val groupBy: T => Group, + val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit] { override def carbonCopy: Module = newInstance - protected def newInstance: GroupByModule[T, Group] = new GroupByModule[T, Group](groupBy, attributes) + protected def newInstance: GroupByModule[T, Group] = { + new GroupByModule[T, Group](groupBy, attributes) + } override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = { new GroupByModule[T, Group](groupBy, attributes) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala index e28fbfc..926feb6 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,7 +22,6 @@ import akka.stream.Attributes import akka.stream.impl.FlowModule import akka.stream.impl.StreamLayout.Module - /** * * Reduce Module @@ -31,7 +30,9 @@ import akka.stream.impl.StreamLayout.Module * @param attributes * @tparam T */ -case class ReduceModule[T](val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule")) extends FlowModule[T, T, Unit]{ +case class ReduceModule[T]( + val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule")) + extends FlowModule[T, T, Unit] { override def carbonCopy: Module = newInstance http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala index df032c8..a9f6e97 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,17 @@ package akka.stream.gearpump.scaladsl -import akka.stream.{FlowShape, Graph, Attributes} -import akka.stream.gearpump.module.{ProcessorModule, ReduceModule, GroupByModule, DummySink, DummySource, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} -import akka.stream.impl.Stages.Map -import akka.stream.scaladsl.{FlowOps, Flow, Keep, Sink, Source} +import akka.stream.Attributes +import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import org.reactivestreams.{Publisher, Subscriber} + import io.gearpump.cluster.UserConfig import io.gearpump.streaming.sink.DataSink import io.gearpump.streaming.source.DataSource import io.gearpump.streaming.task.Task -import org.reactivestreams.{Publisher, Subscriber} - -object GearSource{ +object GearSource { /** * Construct a Source which accepts out of band input messages. @@ -53,7 +52,7 @@ object GearSource{ /** * Construct a Source from Gearpump [[DataSource]]. * - * [[SourceTaskModule]] -> downstream Sink + * [[SourceTaskModule]] -> downstream Sink * */ def from[OUT](source: DataSource): Source[OUT, Unit] = { @@ -64,7 +63,7 @@ object GearSource{ /** * Construct a Source from Gearpump [[io.gearpump.streaming.Processor]]. * - * [[ProcessorModule]] -> downstream Sink + * [[ProcessorModule]] -> downstream Sink * */ def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = { @@ -98,7 +97,7 @@ object GearSink { /** * Construct a Sink from Gearpump [[DataSink]]. * - * Upstream Source -> [[SinkTaskModule]] + * Upstream Source -> [[SinkTaskModule]] * */ def to[IN](sink: DataSink): Sink[IN, Unit] = { @@ -109,7 +108,7 @@ object GearSink { /** * Construct a Sink from Gearpump [[io.gearpump.streaming.Processor]]. * - * Upstream Source -> [[ProcessorModule]] + * Upstream Source -> [[ProcessorModule]] * */ def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = { @@ -147,8 +146,8 @@ object GearSink { * sink will only operate on the main stream. * */ -object GroupBy{ - def apply[T, Group](groupBy: T=>Group): Flow[T, T, Unit] = { +object GroupBy { + def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = { new Flow[T, T, Unit](new GroupByModule(groupBy)) } } @@ -160,19 +159,18 @@ object GroupBy{ * * */ -object Reduce{ +object Reduce { def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = { new Flow[T, T, Unit](new ReduceModule(reduce)) } } - /** * Create a Flow by providing a Gearpump Processor class and configuration * * */ -object Processor{ +object Processor { def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = { new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf)) } @@ -193,7 +191,6 @@ object Implicits { source.via[T, Unit](stage) } - def reduce(reduce: (T, T) => T): Source[T, Mat] = { val stage = Reduce.apply(reduce) source.via[T, Unit](stage) @@ -240,13 +237,10 @@ object Implicits { } /** - * do sum on values + * Does sum on values * * Before doing this, you need to do groupByKey to group same key together * , otherwise, it will do the sum no matter what current key is. - * - * @param numeric - * @return */ def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = { val stage = Reduce.apply(sumByValue[K, V](numeric)) @@ -255,13 +249,13 @@ object Implicits { } /** - * Help util to support groupByKey and sum + * Helper util to support groupByKey and sum */ implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) { /** - * if it is a KV Pair, we can group the KV pair by the key. - * @return + * If it is a KV Pair, we can group the KV pair by the key. + * */ def groupByKey: Flow[(K, V), (K, V), Mat] = { val stage = GroupBy.apply(getTupleKey[K, V]) @@ -274,8 +268,6 @@ object Implicits { * Before doing this, you need to do groupByKey to group same key together * , otherwise, it will do the sum no matter what current key is. * - * @param numeric - * @return */ def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = { val stage = Reduce.apply(sumByValue[K, V](numeric)) @@ -285,6 +277,6 @@ object Implicits { private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - private def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] - = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) + private def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = + (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala index e41b771..58a04ca 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,12 +22,12 @@ import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.task.TaskContext -class BalanceTask(context: TaskContext, userConf : UserConfig) extends GraphTask(context, userConf) { +class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { val sizeOfOutputs = sizeOfOutPorts var index = 0 - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { output(index, msg) index += 1 if (index == sizeOfOutputs) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala index b61813c..388806e 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,8 +22,8 @@ import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.task.TaskContext -class BroadcastTask(context: TaskContext, userConf : UserConfig) extends GraphTask(context, userConf) { - override def onNext(msg : Message) : Unit = { +class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { + override def onNext(msg: Message): Unit = { context.output(msg) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala index e5243f9..d3f483d 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,29 +19,32 @@ package akka.stream.gearpump.task import akka.stream.gearpump.task.GraphTask.{Index, PortId} + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper} - -class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig) extends Task(inputTaskContext, userConf) { +class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig) + extends Task(inputTaskContext, userConf) { private val context = inputTaskContext.asInstanceOf[TaskWrapper] - private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get) - private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get) + private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]]( + GraphTask.OUT_PROCESSORS).get) + private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]]( + GraphTask.IN_PROCESSORS).get) val sizeOfOutPorts = outMapping.keys.size val sizeOfInPorts = inMapping.keys.size - + private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = { - val portToProcessor = processors.zipWithIndex.map{kv => + val portToProcessor = processors.zipWithIndex.map { kv => (kv._2, kv._1) }.toMap val processorToIndex = processors.sorted.zipWithIndex.toMap - val portToIndex = portToProcessor.map{kv => + val portToIndex = portToProcessor.map { kv => val (outlet, processorId) = kv val index = processorToIndex(processorId) (outlet, index) @@ -53,9 +56,9 @@ class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig) extends T context.output(outMapping(outletId), msg) } - override def onStart(startTime : StartTime) : Unit = {} + override def onStart(startTime: StartTime): Unit = {} - override def onStop() : Unit = {} + override def onStop(): Unit = {} } object GraphTask { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala index 5a9a5b6..d7bacd5 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,6 +25,8 @@ import akka.actor.Actor.Receive import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage import akka.util.Timeout +import org.reactivestreams.{Publisher, Subscriber, Subscription} + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext @@ -32,9 +34,6 @@ import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} import io.gearpump.util.LogUtil -import org.reactivestreams.{Publisher, Subscriber, Subscription} - -import scala.concurrent.ExecutionContext /** * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module @@ -47,11 +46,8 @@ import scala.concurrent.ExecutionContext * \| * Akka Stream [[Subscriber]] * - * - * @param taskContext - * @param userConf */ -class SinkBridgeTask (taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) { +class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { import taskContext.taskId val queue = new util.LinkedList[Message]() @@ -59,14 +55,14 @@ class SinkBridgeTask (taskContext : TaskContext, userConf : UserConfig) extends var request: Int = 0 - override def onStart(startTime : StartTime) : Unit = {} + override def onStart(startTime: StartTime): Unit = {} - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { queue.add(msg) trySendingData() } - override def onStop() : Unit = {} + override def onStop(): Unit = {} private def trySendingData(): Unit = { if (subscriber != null) { @@ -100,7 +96,7 @@ object SinkBridgeTask { private var actor: ActorRef = null import system.dispatcher - private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map{container => + private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container => // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) container.task } @@ -115,7 +111,7 @@ object SinkBridgeTask { private implicit val timeout = Timeout(5, TimeUnit.SECONDS) override def request(l: Long): Unit = { - task.foreach{ task => + task.foreach { task => task.tell(RequestMessage(l.toInt), actor) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala index fae422b..b433a7f 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,18 @@ package akka.stream.gearpump.task +import scala.concurrent.ExecutionContext + import akka.actor.Actor.Receive -import akka.actor.ActorSystem import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error} +import org.reactivestreams.{Subscriber, Subscription} + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import org.reactivestreams.{Subscriber, Subscription} - -import scala.concurrent.ExecutionContext /** * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task @@ -42,20 +42,17 @@ import scala.concurrent.ExecutionContext * / Local JVM * Akka Stream [[org.reactivestreams.Publisher]] * - * - * @param taskContext - * @param userConf */ -class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) extends Task(taskContext, userConf) { +class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { import taskContext.taskId - override def onStart(startTime : StartTime) : Unit = {} + override def onStart(startTime: StartTime): Unit = {} - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { LOG.info("AkkaStreamSource receiving message " + msg) } - override def onStop() : Unit = {} + override def onStop(): Unit = {} override def receiveUnManagedMessage: Receive = { case Error(ex) => @@ -70,7 +67,6 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) extends } } - object SourceBridgeTask { case class Error(ex: java.lang.Throwable) @@ -83,7 +79,7 @@ object SourceBridgeTask { var subscription: Subscription = null implicit val dispatcher = ec - val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map{container => + val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container => // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task) container.task } @@ -93,7 +89,6 @@ object SourceBridgeTask { } override def onSubscribe(subscription: Subscription): Unit = { - // when taskActorRef is resolved, request message from upstream this.subscription = subscription task.map(task => subscription.request(1)) } @@ -103,7 +98,7 @@ object SourceBridgeTask { } override def onNext(t: T): Unit = { - task.map{task => + task.map { task => task ! AkkaStreamMessage(t) } subscription.request(1) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala index 12470df..0b3b9a5 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,15 +19,16 @@ package akka.stream.gearpump.task import akka.stream.gearpump.task.UnZip2Task.UnZipFunction + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.task.TaskContext -class UnZip2Task(context: TaskContext, userConf : UserConfig) extends GraphTask(context, userConf) { +class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { val unzip = userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { val message = msg.msg val time = msg.timestamp val pair = unzip(message) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala index 616a279..c774fc7 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,14 +25,14 @@ class MaterializedValueOps(mat: MaterializedValueNode) { def resolveMaterialized(mat: MaterializedValueNode, materializedValues: Map[Module, Any]): Any = mat match { case Atomic(m) => materializedValues.getOrElse(m, ()) case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) - case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) - case Ignore => () + case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) + case Ignore => () } resolveMaterialized(mat, materializedValues).asInstanceOf[Mat] } } -object MaterializedValueOps{ +object MaterializedValueOps { def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala index 97a52bf..4ead839 100644 --- a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala +++ b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,7 +19,7 @@ package akka.stream.gearpump import akka.stream.Attributes -import org.scalatest.{FlatSpec, Matchers, WordSpec} +import org.scalatest.{FlatSpec, Matchers} class AttributesSpec extends FlatSpec with Matchers { it should "merge the attributes together" in { @@ -30,5 +30,4 @@ class AttributesSpec extends FlatSpec with Matchers { assert("aa-bb" == c.nameOrDefault()) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/README.md ---------------------------------------------------------------------- diff --git a/experiments/cgroup/README.md b/experiments/cgroup/README.md new file mode 100644 index 0000000..ca839cd --- /dev/null +++ b/experiments/cgroup/README.md @@ -0,0 +1 @@ +Please see http://gearpump.io for documentation on Cgroup. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java index 9c7151e..973ad03 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -65,5 +65,4 @@ public class CGroupResource { public void setEnable(boolean enable) { this.enable = enable; } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java index 1ec7714..dc889ba 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -161,7 +161,6 @@ public class CgroupCenter implements CgroupOperation { new File(hierarchy.getDir()).mkdirs(); String subSystems = CgroupUtils.reAnalyse(resourceTypes); SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems); - } @Override @@ -210,5 +209,4 @@ public class CgroupCenter implements CgroupOperation { public static void main(String args[]) { System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size()); } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java index 1c644e0..5414814 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -217,5 +217,4 @@ public class CgroupCommon implements CgroupCommonOperation { } } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java index e4620c5..7465645 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -43,5 +43,4 @@ public interface CgroupCommonOperation { public boolean getCgroupCloneChildren() throws IOException; public void setEventControl(String eventFd, String controlFd, String... args) throws IOException; - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java index 139ca80..a719f91 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,8 @@ */ package io.gearpump.cluster.cgroup; -import io.gearpump.cluster.cgroup.core.*; +import io.gearpump.cluster.cgroup.core.CgroupCore; +import io.gearpump.cluster.cgroup.core.CpuCore; import java.util.HashMap; import java.util.Map; @@ -38,5 +39,4 @@ public class CgroupCoreFactory { } return result; } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java index 67f48f3..a3d830a 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -40,5 +40,4 @@ public interface CgroupOperation { public void create(CgroupCommon cgroup) throws SecurityException; public void delete(CgroupCommon cgroup) throws IOException; - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java index fe30b23..0a7f97c 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -129,7 +129,6 @@ public class CgroupUtils { bw.write(string); bw.newLine(); bw.flush(); - } finally { CgroupUtils.close(writer, bw); } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java index 1b8ecbb..80e12be 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,5 +26,4 @@ public class Constants { public static String getDir(String dir, String constant) { return dir + constant; } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java index e904aeb..69ec1ed 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java index c063482..c2a1d42 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java index ae90b74..23e630c 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,5 +22,4 @@ import io.gearpump.cluster.cgroup.ResourceType; public interface CgroupCore { public ResourceType getType(); - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java index 8578590..3402d5a 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,7 +22,6 @@ import io.gearpump.cluster.cgroup.Constants; import io.gearpump.cluster.cgroup.ResourceType; import java.io.IOException; -import java.util.List; public class CpuCore implements CgroupCore { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java index 9117160..0772133 100644 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java +++ b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,41 +25,39 @@ import java.io.IOException; public class SystemOperation { - public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); - - public static void mount(String name, String target, String type, String data) throws IOException { - StringBuilder sb = new StringBuilder(); - sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); - SystemOperation.exec(sb.toString()); - } - - public static void umount(String name) throws IOException { - StringBuilder sb = new StringBuilder(); - sb.append("umount ").append(name); - SystemOperation.exec(sb.toString()); - } - - public static String exec(String cmd) throws IOException { - LOG.debug("Shell cmd: " + cmd); - Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start(); - try { - process.waitFor(); - String output = IOUtils.toString(process.getInputStream()); - String errorOutput = IOUtils.toString(process.getErrorStream()); - LOG.debug("Shell Output: " + output); - if (errorOutput.length() != 0) { - LOG.error("Shell Error Output: " + errorOutput); - throw new IOException(errorOutput); - } - return output; - } catch (InterruptedException ie) { - throw new IOException(ie.toString()); - } - - } - - public static void main(String[] args) throws IOException { - SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu"); + public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); + + public static void mount(String name, String target, String type, String data) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); + SystemOperation.exec(sb.toString()); + } + + public static void umount(String name) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append("umount ").append(name); + SystemOperation.exec(sb.toString()); + } + + public static String exec(String cmd) throws IOException { + LOG.debug("Shell cmd: " + cmd); + Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start(); + try { + process.waitFor(); + String output = IOUtils.toString(process.getInputStream()); + String errorOutput = IOUtils.toString(process.getErrorStream()); + LOG.debug("Shell Output: " + output); + if (errorOutput.length() != 0) { + LOG.error("Shell Error Output: " + errorOutput); + throw new IOException(errorOutput); + } + return output; + } catch (InterruptedException ie) { + throw new IOException(ie.toString()); } + } + public static void main(String[] args) throws IOException { + SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu"); + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala index a01d17a..ae7fb42 100644 --- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala +++ b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.cluster.worker import com.typesafe.config.Config +import org.apache.commons.lang.SystemUtils +import org.slf4j.{Logger, LoggerFactory} + import io.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore} import io.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType} import io.gearpump.cluster.worker.CGroupManager._ -import org.apache.commons.lang.SystemUtils -import org.slf4j.{LoggerFactory, Logger} class CGroupManager(config: Config) { private val center = CgroupCenter.getInstance() @@ -49,12 +50,13 @@ class CGroupManager(config: Config) { } private def validateCpuUpperLimitValue(value: Int): Int = { - if(value > 10) + if (value > 10) { 10 - else if(value < 1 && value != -1) + } else if (value < 1 && value != -1) { 1 - else + } else { value + } } private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala index 9e7ce7b..eb57a18 100644 --- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala +++ b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,42 +18,46 @@ package io.gearpump.cluster.worker import java.io.File +import scala.sys.process.Process import com.typesafe.config.Config +import org.slf4j.{Logger, LoggerFactory} + import io.gearpump.cluster.scheduler.Resource import io.gearpump.util.{ProcessLogRedirector, RichProcess} -import org.slf4j.{LoggerFactory, Logger} - -import scala.sys.process.Process /** - * CGroupProcessLauncher is used to launch a process for Executor with CGroup. - * For more details, please refer http://gearpump.io - */ -class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher{ + * CGroupProcessLauncher is used to launch a process for Executor with CGroup. + * For more details, please refer http://gearpump.io + */ +class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher { private val APP_MASTER = -1 private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config) private val LOG: Logger = LoggerFactory.getLogger(getClass) override def cleanProcess(appId: Int, executorId: Int): Unit = { - if(executorId != APP_MASTER) { + if (executorId != APP_MASTER) { cgroupManager.foreach(_.shutDownExecutor(appId, executorId)) } } - override def createProcess(appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String], + override def createProcess( + appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String], classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { val cgroupCommand = if (executorId != APP_MASTER) { - cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, executorId)).getOrElse(List.empty) + cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId, + executorId)).getOrElse(List.empty) } else List.empty - LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, classpath: ${classPath.mkString(File.pathSeparator)}") + LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " + + s"classpath: ${classPath.mkString(File.pathSeparator)}") val java = System.getProperty("java.home") + "/bin/java" - val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments - LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; options: ${options.mkString(" ")}") + val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath + .mkString(File.pathSeparator), mainClass) ++ arguments + LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " + + s"options: ${options.mkString(" ")}") val logger = new ProcessLogRedirector() val process = Process(command).run(logger) new RichProcess(process, logger) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/README.md ---------------------------------------------------------------------- diff --git a/experiments/storm/README.md b/experiments/storm/README.md index f42547c..d02bc6d 100644 --- a/experiments/storm/README.md +++ b/experiments/storm/README.md @@ -3,14 +3,14 @@ on Gearpump. This documentation illustrates how to do so in a local Gearpump clu ## How to run a Storm application over Gearpump - 1. launch a local cluster + 1. Launch a local cluster ```bash ./target/pack/bin/local ``` - 2. submit a topology from storm-starter + 2. Submit a topology from storm-starter ```bash bin/storm -verbose -config storm.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation @@ -27,4 +27,3 @@ on Gearpump. This documentation illustrates how to do so in a local Gearpump clu 1. Trident support is ongoing. - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java new file mode 100644 index 0000000..510258d --- /dev/null +++ b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm.util; + +import backtype.storm.utils.TimeCacheMap; + +/** + * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression. + */ +@SuppressWarnings("deprecation") +public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> { + + public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) { + super(expirationSecs, new ExpiredCallback<K, V>() { + + @Override + public void expire(K key, V val) { + callback.expire(key, val); + } + }); + } + + public static interface Callback<K, V> { + public void expire(K key, V val); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala index daa7aa5..19814e9 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,31 +18,34 @@ package io.gearpump.experiments.storm +import org.slf4j.Logger + import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient} import io.gearpump.util.LogUtil -import org.slf4j.Logger object StormRunner { private val LOG: Logger = LogUtil.getLogger(getClass) private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient) - private def usage: Unit = { + private def usage(): Unit = { val keys = commands.keys.toList.sorted + // scalastyle:off println Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println } - private def executeCommand(command : String, commandArgs : Array[String]): Unit = { + private def executeCommand(command: String, commandArgs: Array[String]): Unit = { if (!commands.contains(command)) { - usage + usage() } else { commands(command).main(commandArgs) } } - def main(args: Array[String]) = { + def main(args: Array[String]): Unit = { if (args.length == 0) { - usage + usage() } else { val command = args(0) val commandArgs = args.drop(1)
