http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala deleted file mode 100644 index f23604a..0000000 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcount - -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.embedded.EmbeddedCluster -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph.Node -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -/** Same WordCount with low level Processor Graph syntax */ -object WordCount extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - val RUN_FOR_EVER = -1 - - override val options: Array[(String, CLIOption[Any])] = Array( - "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), - "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), - "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), - "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, - defaultValue = Some(30)) - ) - - def application(config: ParseResult): StreamApplication = { - val splitNum = config.getInt("split") - val sumNum = config.getInt("sum") - val split = Processor[Split](splitNum) - val sum = Processor[Sum](sumNum) - val partitioner = new HashPartitioner - - val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - val debugMode = config.getBoolean("debug") - val sleepSeconds = config.getInt("sleep") - - val localCluster = if (debugMode) { - val cluster = new EmbeddedCluster(akkaConf: Config) - cluster.start() - Some(cluster) - } else { - None - } - - val context: ClientContext = localCluster match { - case Some(local) => local.newClientContext - case None => ClientContext(akkaConf) - } - - val app = application(config) - context.submit(app) - - if (debugMode) { - Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. - } - - context.close() - localCluster.map(_.stop()) - } -} -
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala deleted file mode 100644 index ab8e8d0..0000000 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcount.dsl - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.streaming.dsl.StreamApp -import io.gearpump.streaming.dsl.StreamApp._ -import io.gearpump.util.AkkaApp - -/** Same WordCount with High level DSL syntax */ -object WordCount extends AkkaApp with ArgumentsParser { - - override val options: Array[(String, CLIOption[Any])] = Array.empty - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val context = ClientContext(akkaConf) - val app = StreamApp("dsl", context) - val data = "This is a good start, bingo!! bingo!!" - app.source(data.lines.toList, 1, "source"). - // word => (word, count) - flatMap(line => line.split("[\\s]+")).map((_, 1)). - // (word, count1), (word, count2) => (word, count1 + count2) - groupByKey().sum.log - - val appId = context.submit(app) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala new file mode 100644 index 0000000..ae63f10 --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcount + +import java.util.concurrent.TimeUnit + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.output + + override def onStart(startTime: StartTime): Unit = { + self ! Message("start") + } + + override def onNext(msg: Message): Unit = { + Split.TEXT_TO_SPLIT.lines.foreach { line => + line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => + output(new Message(msg, System.currentTimeMillis())) + } + } + + import scala.concurrent.duration._ + taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! + Message("continue", System.currentTimeMillis())) + } +} + +object Split { + val TEXT_TO_SPLIT = + """ + | Licensed to the Apache Software Foundation (ASF) under one + | or more contributor license agreements. See the NOTICE file + | distributed with this work for additional information + | regarding copyright ownership. The ASF licenses this file + | to you under the Apache License, Version 2.0 (the + | "License"); you may not use this file except in compliance + | with the License. You may obtain a copy of the License at + | + | http://www.apache.org/licenses/LICENSE-2.0 + | + | Unless required by applicable law or agreed to in writing, software + | distributed under the License is distributed on an "AS IS" BASIS, + | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + | See the License for the specific language governing permissions and + | limitations under the License. + """.stripMargin +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala new file mode 100644 index 0000000..c3fa82a --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcount + +import java.util.concurrent.TimeUnit +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Cancellable + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + private[wordcount] val map: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() + + private[wordcount] var wordCount: Long = 0 + private var snapShotTime: Long = System.currentTimeMillis() + private var snapShotWordCount: Long = 0 + + private var scheduler: Cancellable = null + + override def onStart(startTime: StartTime): Unit = { + scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), + new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount) + } + + override def onNext(msg: Message): Unit = { + if (null != msg) { + val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L) + wordCount += 1 + map.put(msg.msg.asInstanceOf[String], current + 1) + } + } + + override def onStop(): Unit = { + if (scheduler != null) { + scheduler.cancel() + } + } + + def reportWordCount(): Unit = { + val current: Long = System.currentTimeMillis() + LOG.info(s"Task ${taskContext.taskId} Throughput:" + + s" ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)") + snapShotWordCount = wordCount + snapShotTime = current + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala new file mode 100644 index 0000000..9917d9f --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcount + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedCluster +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +/** Same WordCount with low level Processor Graph syntax */ +object WordCount extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + val RUN_FOR_EVER = -1 + + override val options: Array[(String, CLIOption[Any])] = Array( + "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), + "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), + "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), + "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, + defaultValue = Some(30)) + ) + + def application(config: ParseResult): StreamApplication = { + val splitNum = config.getInt("split") + val sumNum = config.getInt("sum") + val split = Processor[Split](splitNum) + val sum = Processor[Sum](sumNum) + val partitioner = new HashPartitioner + + val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + val debugMode = config.getBoolean("debug") + val sleepSeconds = config.getInt("sleep") + + val localCluster = if (debugMode) { + val cluster = new EmbeddedCluster(akkaConf: Config) + cluster.start() + Some(cluster) + } else { + None + } + + val context: ClientContext = localCluster match { + case Some(local) => local.newClientContext + case None => ClientContext(akkaConf) + } + + val app = application(config) + context.submit(app) + + if (debugMode) { + Thread.sleep(sleepSeconds * 1000) // Sleeps for 30 seconds for debugging. + } + + context.close() + localCluster.map(_.stop()) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala new file mode 100644 index 0000000..22f597c --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcount.dsl + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.dsl.StreamApp._ +import org.apache.gearpump.util.AkkaApp + +/** Same WordCount with High level DSL syntax */ +object WordCount extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + val app = StreamApp("dsl", context) + val data = "This is a good start, bingo!! bingo!!" + app.source(data.lines.toList, 1, "source"). + // word => (word, count) + flatMap(line => line.split("[\\s]+")).map((_, 1)). + // (word, count1), (word, count2) => (word, count1 + count2) + groupByKey().sum.log + + val appId = context.submit(app) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala deleted file mode 100644 index 21e498e..0000000 --- a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SplitSpec.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.wordcount - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class SplitSpec extends WordSpec with Matchers { - - "Split" should { - "split the text and deliver to next task" in { - - val taskContext = MockUtil.mockTaskContext - - implicit val system: ActorSystem = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - - val mockTaskActor = TestProbe() - - // Mock self ActorRef - when(taskContext.self).thenReturn(mockTaskActor.ref) - - val conf = UserConfig.empty - val split = new Split(taskContext, conf) - split.onStart(StartTime(0)) - mockTaskActor.expectMsgType[Message] - - val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length - - split.onNext(Message("next")) - verify(taskContext, times(expectedWordCount)).output(anyObject()) - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala deleted file mode 100644 index 48a3fa9..0000000 --- a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/SumSpec.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.wordcount - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { - val stringGenerator = Gen.alphaStr - - var wordcount = 0 - - property("Sum should calculate the frequency of the word correctly") { - - val taskContext = MockUtil.mockTaskContext - - val conf = UserConfig.empty - - val sum = new Sum(taskContext, conf) - - sum.onStart(StartTime(0)) - - forAll(stringGenerator) { txt => - wordcount += 1 - sum.onNext(Message(txt)) - } - val all = sum.map.foldLeft(0L) { (total, kv) => - val (_, num) = kv - total + num - } - assert(sum.wordCount == all && sum.wordCount == wordcount) - - sum.reportWordCount() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala deleted file mode 100644 index 63b2312..0000000 --- a/examples/streaming/wordcount/src/test/scala/io/gearpump/streaming/examples/wordcount/WordCountSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.wordcount - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class WordCountSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("WordCount should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - val optionalArgs = Array( - "-split", "1", - "-sum", "1") - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - - val args = requiredArgs ++ optionalArgs - - Future { - WordCount.main(masterConfig, args) - } - - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala new file mode 100644 index 0000000..cef9337 --- /dev/null +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.wordcount + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class SplitSpec extends WordSpec with Matchers { + + "Split" should { + "split the text and deliver to next task" in { + + val taskContext = MockUtil.mockTaskContext + + implicit val system: ActorSystem = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + + val mockTaskActor = TestProbe() + + // Mock self ActorRef + when(taskContext.self).thenReturn(mockTaskActor.ref) + + val conf = UserConfig.empty + val split = new Split(taskContext, conf) + split.onStart(StartTime(0)) + mockTaskActor.expectMsgType[Message] + + val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").filter(_.nonEmpty).length + + split.onNext(Message("next")) + verify(taskContext, times(expectedWordCount)).output(anyObject()) + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala new file mode 100644 index 0000000..e42d696 --- /dev/null +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SumSpec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.wordcount + +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class SumSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { + val stringGenerator = Gen.alphaStr + + var wordcount = 0 + + property("Sum should calculate the frequency of the word correctly") { + + val taskContext = MockUtil.mockTaskContext + + val conf = UserConfig.empty + + val sum = new Sum(taskContext, conf) + + sum.onStart(StartTime(0)) + + forAll(stringGenerator) { txt => + wordcount += 1 + sum.onNext(Message(txt)) + } + val all = sum.map.foldLeft(0L) { (total, kv) => + val (_, num) = kv + total + num + } + assert(sum.wordCount == all && sum.wordCount == wordcount) + + sum.reportWordCount() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala new file mode 100644 index 0000000..f703552 --- /dev/null +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcount + +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class WordCountSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("WordCount should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + val optionalArgs = Array( + "-split", "1", + "-sum", "1") + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs) + ) + } + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + + val args = requiredArgs ++ optionalArgs + + Future { + WordCount.main(masterConfig, args) + } + + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala index 8851083..7daebf1 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala @@ -33,8 +33,8 @@ import akka.stream.impl.StreamLayout._ import akka.stream.impl._ import akka.stream.{Graph => AkkaGraph} -import _root_.io.gearpump.util -import _root_.io.gearpump.util.Graph +import _root_.org.apache.gearpump.util +import _root_.org.apache.gearpump.util.Graph /** * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala index b1bf01c..976b1e6 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala @@ -26,7 +26,7 @@ import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.scaladsl.GearSource import akka.stream.scaladsl.Sink -import io.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.streaming.dsl.CollectionDataSource /** * read from remote and write to local http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala index b1e2fcb..7b80b7b 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala @@ -26,7 +26,7 @@ import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.scaladsl.GearSink import akka.stream.scaladsl.Source -import io.gearpump.streaming.dsl.LoggerSink +import org.apache.gearpump.streaming.dsl.LoggerSink /** * read from local and write to remote http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala index 38b40b7..0fccd30 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala @@ -26,7 +26,7 @@ import akka.stream.gearpump.GearpumpMaterializer import akka.stream.gearpump.scaladsl.GearSource import akka.stream.scaladsl.Sink -import io.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.streaming.dsl.CollectionDataSource /** * WordCount example http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala index c2d0417..56b89bc 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala @@ -31,8 +31,8 @@ import akka.stream.scaladsl._ import akka.util.ByteString import org.json4s.JsonAST.JString -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.util.AkkaApp +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp /** * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala index ef8c6fa..19083f6 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala @@ -28,7 +28,7 @@ import akka.stream.impl.Stages.DirectProcessor import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module} import akka.stream.impl.{SinkModule, SourceModule} -import io.gearpump.util.Graph +import org.apache.gearpump.util.Graph /** * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala index 473f32a..6ef8598 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala @@ -28,7 +28,7 @@ import akka.stream.impl.{PublisherSource, SubscriberSink} import akka.stream.{Outlet, SinkShape, SourceShape} import org.reactivestreams.{Publisher, Subscriber} -import io.gearpump.util.Graph +import org.apache.gearpump.util.Graph /** * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala index 5bb9a68..3cea78a 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala @@ -26,10 +26,10 @@ import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient import akka.stream.impl.StreamLayout.Module -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.embedded.EmbeddedCluster -import io.gearpump.streaming.ProcessorId -import io.gearpump.util.Graph +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedCluster +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.util.Graph /** * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala index 4364645..564b6c7 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala @@ -21,7 +21,7 @@ package akka.stream.gearpump.graph import akka.stream.ModuleGraph.Edge import akka.stream.impl.StreamLayout.Module -import io.gearpump.util.Graph +import org.apache.gearpump.util.Graph /** * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala index 78125c8..a5c6e48 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala @@ -29,7 +29,7 @@ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamSupervisor import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph} -import io.gearpump.util.Graph +import org.apache.gearpump.util.Graph /** * [[LocalMaterializer]] will use local actor to materialize the graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala index 97d8f70..1ec724e 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala @@ -33,7 +33,7 @@ import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape} import org.reactivestreams.{Processor, Publisher, Subscriber} -import io.gearpump.util.Graph +import org.apache.gearpump.util.Graph /** * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala index 9852ed0..47ed1f2 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala @@ -28,11 +28,11 @@ import akka.stream.impl.Stages.StageModule import akka.stream.impl.StreamLayout.Module import org.slf4j.LoggerFactory -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.dsl.StreamApp -import io.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp} -import io.gearpump.streaming.{ProcessorId, StreamApplication} -import io.gearpump.util.Graph +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp} +import org.apache.gearpump.streaming.{ProcessorId, StreamApplication} +import org.apache.gearpump.util.Graph /** * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala index 4b7d3ac..c4c78cc 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala @@ -22,10 +22,10 @@ import akka.stream.impl.FlowModule import akka.stream.impl.StreamLayout.Module import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape} -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.source.DataSource -import io.gearpump.streaming.task.Task +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task /** * [[GearpumpTaskModule]] represent modules that can be materialized as Gearpump Tasks. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala index a9f6e97..9e35389 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala @@ -23,10 +23,10 @@ import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, Proce import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import org.reactivestreams.{Publisher, Subscriber} -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.sink.DataSink -import io.gearpump.streaming.source.DataSource -import io.gearpump.streaming.task.Task +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.Task object GearSource { @@ -61,7 +61,7 @@ object GearSource { } /** - * Construct a Source from Gearpump [[io.gearpump.streaming.Processor]]. + * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]]. * * [[ProcessorModule]] -> downstream Sink * @@ -106,7 +106,7 @@ object GearSink { } /** - * Construct a Sink from Gearpump [[io.gearpump.streaming.Processor]]. + * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]]. * * Upstream Source -> [[ProcessorModule]] * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala index 58a04ca..2eb0612 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala @@ -18,9 +18,9 @@ package akka.stream.gearpump.task -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.TaskContext +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala index 388806e..925bf21 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala @@ -18,9 +18,9 @@ package akka.stream.gearpump.task -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.TaskContext +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { override def onNext(msg: Message): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala index d3f483d..9a4e24e 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala @@ -20,10 +20,10 @@ package akka.stream.gearpump.task import akka.stream.gearpump.task.GraphTask.{Index, PortId} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper} +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper} class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig) extends Task(inputTaskContext, userConf) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala index d7bacd5..b681852 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala @@ -27,13 +27,13 @@ import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage import akka.util.Timeout import org.reactivestreams.{Publisher, Subscriber, Subscription} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import io.gearpump.util.LogUtil +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import org.apache.gearpump.util.LogUtil /** * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala index b433a7f..ccbd350 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala @@ -24,12 +24,12 @@ import akka.actor.Actor.Receive import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error} import org.reactivestreams.{Subscriber, Subscription} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} /** * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala index 0b3b9a5..78fabbe 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala @@ -20,9 +20,9 @@ package akka.stream.gearpump.task import akka.stream.gearpump.task.UnZip2Task.UnZipFunction -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.TaskContext +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java deleted file mode 100644 index 973ad03..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CGroupResource.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -public class CGroupResource { - - private ResourceType type; - - private int hierarchyID; - - private int cgroupsNum; - - private boolean enable; - - public CGroupResource(ResourceType type, int hierarchyID, int cgroupNum, boolean enable) { - this.type = type; - this.hierarchyID = hierarchyID; - this.cgroupsNum = cgroupNum; - this.enable = enable; - } - - public ResourceType getType() { - return type; - } - - public void setType(ResourceType type) { - this.type = type; - } - - public int getHierarchyID() { - return hierarchyID; - } - - public void setHierarchyID(int hierarchyID) { - this.hierarchyID = hierarchyID; - } - - public int getCgroupsNum() { - return cgroupsNum; - } - - public void setCgroupsNum(int cgroupsNum) { - this.cgroupsNum = cgroupsNum; - } - - public boolean isEnable() { - return enable; - } - - public void setEnable(boolean enable) { - this.enable = enable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java deleted file mode 100644 index dc889ba..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCenter.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import io.gearpump.cluster.utils.SystemOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.*; - -public class CgroupCenter implements CgroupOperation { - - public static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class); - - private static CgroupCenter instance; - - private CgroupCenter() { - - } - - /** - * Thread unsafe - * - * @return - */ - public synchronized static CgroupCenter getInstance() { - if (instance == null) - instance = new CgroupCenter(); - return CgroupUtils.enabled() ? instance : null; - } - - @Override - public List<Hierarchy> getHierarchies() { - // TODO Auto-generated method stub - Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>(); - FileReader reader = null; - BufferedReader br = null; - try { - reader = new FileReader(Constants.MOUNT_STATUS_FILE); - br = new BufferedReader(reader); - String str = null; - while ((str = br.readLine()) != null) { - String[] strSplit = str.split(" "); - if (!strSplit[2].equals("cgroup")) - continue; - String name = strSplit[0]; - String type = strSplit[3]; - String dir = strSplit[1]; - Hierarchy h = hierarchies.get(type); - h = new Hierarchy(name, CgroupUtils.analyse(type), dir); - hierarchies.put(type, h); - } - return new ArrayList<Hierarchy>(hierarchies.values()); - } catch (Exception e) { - LOG.error("Get hierarchies error", e); - } finally { - CgroupUtils.close(reader, br); - } - return null; - } - - @Override - public Set<CGroupResource> getCGroupResources() { - // TODO Auto-generated method stub - Set<CGroupResource> resources = new HashSet<CGroupResource>(); - FileReader reader = null; - BufferedReader br = null; - try { - reader = new FileReader(Constants.CGROUP_STATUS_FILE); - br = new BufferedReader(reader); - String str = null; - while ((str = br.readLine()) != null) { - String[] split = str.split("\t"); - ResourceType type = ResourceType.getResourceType(split[0]); - if (type == null) - continue; - resources.add(new CGroupResource(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true - : false)); - } - return resources; - } catch (Exception e) { - LOG.error("Get subSystems error ", e); - } finally { - CgroupUtils.close(reader, br); - } - return null; - } - - @Override - public boolean enabled(ResourceType resourceType) { - // TODO Auto-generated method stub - Set<CGroupResource> resources = this.getCGroupResources(); - for (CGroupResource resource : resources) { - if (resource.getType() == resourceType) - return true; - } - return false; - } - - @Override - public Hierarchy busy(ResourceType resourceType) { - List<Hierarchy> hierarchies = this.getHierarchies(); - for (Hierarchy hierarchy : hierarchies) { - for (ResourceType type : hierarchy.getResourceTypes()) { - if (type == resourceType) - return hierarchy; - } - } - return null; - } - - @Override - public Hierarchy mounted(Hierarchy hierarchy) { - // TODO Auto-generated method stub - List<Hierarchy> hierarchies = this.getHierarchies(); - if (CgroupUtils.dirExists(hierarchy.getDir())) { - for (Hierarchy h : hierarchies) { - if (h.equals(hierarchy)) - return h; - } - } - return null; - } - - @Override - public void mount(Hierarchy hierarchy) throws IOException { - // TODO Auto-generated method stub - if (this.mounted(hierarchy) != null) { - LOG.error(hierarchy.getDir() + " is mounted"); - return; - } - Set<ResourceType> resourceTypes = hierarchy.getResourceTypes(); - for (ResourceType type : resourceTypes) { - if (this.busy(type) != null) { - LOG.error("subsystem: " + type.name() + " is busy"); - resourceTypes.remove(type); - } - } - if (resourceTypes.size() == 0) - return; - if (!CgroupUtils.dirExists(hierarchy.getDir())) - new File(hierarchy.getDir()).mkdirs(); - String subSystems = CgroupUtils.reAnalyse(resourceTypes); - SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems); - } - - @Override - public void umount(Hierarchy hierarchy) throws IOException { - // TODO Auto-generated method stub - if (this.mounted(hierarchy) != null) { - hierarchy.getRootCgroups().delete(); - SystemOperation.umount(hierarchy.getDir()); - CgroupUtils.deleteDir(hierarchy.getDir()); - } - } - - @Override - public void create(CgroupCommon cgroup) throws SecurityException { - // TODO Auto-generated method stub - if (cgroup.isRoot()) { - LOG.error("You can't create rootCgroup in this function"); - return; - } - CgroupCommon parent = cgroup.getParent(); - while (parent != null) { - if (!CgroupUtils.dirExists(parent.getDir())) { - LOG.error(parent.getDir() + "is not existed"); - return; - } - parent = parent.getParent(); - } - Hierarchy h = cgroup.getHierarchy(); - if (mounted(h) == null) { - LOG.error(h.getDir() + " is not mounted"); - return; - } - if (CgroupUtils.dirExists(cgroup.getDir())) { - LOG.error(cgroup.getDir() + " is existed"); - return; - } - (new File(cgroup.getDir())).mkdir(); - } - - @Override - public void delete(CgroupCommon cgroup) throws IOException { - // TODO Auto-generated method stub - cgroup.delete(); - } - - public static void main(String args[]) { - System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java deleted file mode 100644 index 5414814..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommon.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import io.gearpump.cluster.cgroup.core.CgroupCore; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class CgroupCommon implements CgroupCommonOperation { - - public static final String TASKS = "/tasks"; - public static final String NOTIFY_ON_RELEASE = "/notify_on_release"; - public static final String RELEASE_AGENT = "/release_agent"; - public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children"; - public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control"; - public static final String CGROUP_PROCS = "/cgroup.procs"; - - private final Hierarchy hierarchy; - - private final String name; - - private final String dir; - - private final CgroupCommon parent; - - private final Map<ResourceType, CgroupCore> cores; - - private final boolean isRoot; - - private final Set<CgroupCommon> children = new HashSet<CgroupCommon>(); - - public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) { - this.name = parent.getName() + "/" + name; - this.hierarchy = hierarchy; - this.parent = parent; - this.dir = parent.getDir() + "/" + name; - this.init(); - cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir); - this.isRoot = false; - } - - /** - * rootCgroup - */ - public CgroupCommon(Hierarchy hierarchy, String dir) { - this.name = ""; - this.hierarchy = hierarchy; - this.parent = null; - this.dir = dir; - this.init(); - cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir); - this.isRoot = true; - } - - @Override - public void addTask(int taskId) throws IOException { - // TODO Auto-generated method stub - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId)); - } - - @Override - public Set<Integer> getTasks() throws IOException { - List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS)); - Set<Integer> tasks = new HashSet<Integer>(); - for (String task : stringTasks) { - tasks.add(Integer.valueOf(task)); - } - return tasks; - } - - @Override - public void addProcs(int pid) throws IOException { - // TODO Auto-generated method stub - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid)); - } - - @Override - public Set<Integer> getPids() throws IOException { - // TODO Auto-generated method stub - List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS)); - Set<Integer> pids = new HashSet<Integer>(); - for (String task : stringPids) { - pids.add(Integer.valueOf(task)); - } - return pids; - } - - @Override - public void setNotifyOnRelease(boolean flag) throws IOException { - // TODO Auto-generated method stub - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0"); - } - - @Override - public boolean getNotifyOnRelease() throws IOException { - return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false; - } - - @Override - public void setReleaseAgent(String command) throws IOException { - // TODO Auto-generated method stub - if (!this.isRoot) - return; - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command); - } - - @Override - public String getReleaseAgent() throws IOException { - if (!this.isRoot) - return null; - return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0); - } - - @Override - public void setCgroupCloneChildren(boolean flag) throws IOException { - // TODO Auto-generated method stub - if (!this.cores.keySet().contains(ResourceType.cpuset)) - return; - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0"); - } - - @Override - public boolean getCgroupCloneChildren() throws IOException { - return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false; - } - - @Override - public void setEventControl(String eventFd, String controlFd, String... args) throws IOException { - // TODO Auto-generated method stub - StringBuilder sb = new StringBuilder(); - sb.append(eventFd); - sb.append(' '); - sb.append(controlFd); - for (String arg : args) { - sb.append(' '); - sb.append(arg); - } - CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString()); - } - - public Hierarchy getHierarchy() { - return hierarchy; - } - - public String getName() { - return name; - } - - public String getDir() { - return dir; - } - - public CgroupCommon getParent() { - return parent; - } - - public Set<CgroupCommon> getChildren() { - return children; - } - - public boolean isRoot() { - return isRoot; - } - - public Map<ResourceType, CgroupCore> getCores() { - return cores; - } - - public void delete() throws IOException { - this.free(); - if (!this.isRoot) - this.parent.getChildren().remove(this); - } - - private void free() throws IOException { - for (CgroupCommon child : this.children) - child.free(); - if (this.isRoot) - return; - Set<Integer> tasks = this.getTasks(); - if (tasks != null) { - for (Integer task : tasks) { - this.parent.addTask(task); - } - } - CgroupUtils.deleteDir(this.dir); - } - - private void init() { - File file = new File(this.dir); - File[] files = file.listFiles(); - if (files == null) - return; - for (File child : files) { - if (child.isDirectory()) { - this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java deleted file mode 100644 index 7465645..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCommonOperation.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import java.io.IOException; -import java.util.Set; - -public interface CgroupCommonOperation { - - public void addTask(int taskid) throws IOException; - - public Set<Integer> getTasks() throws IOException; - - public void addProcs(int pid) throws IOException; - - public Set<Integer> getPids() throws IOException; - - public void setNotifyOnRelease(boolean flag) throws IOException; - - public boolean getNotifyOnRelease() throws IOException; - - public void setReleaseAgent(String command) throws IOException; - - public String getReleaseAgent() throws IOException; - - public void setCgroupCloneChildren(boolean flag) throws IOException; - - public boolean getCgroupCloneChildren() throws IOException; - - public void setEventControl(String eventFd, String controlFd, String... args) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java deleted file mode 100644 index a719f91..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupCoreFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import io.gearpump.cluster.cgroup.core.CgroupCore; -import io.gearpump.cluster.cgroup.core.CpuCore; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -public class CgroupCoreFactory { - - public static Map<ResourceType, CgroupCore> getInstance(Set<ResourceType> types, String dir) { - Map<ResourceType, CgroupCore> result = new HashMap<ResourceType, CgroupCore>(); - for (ResourceType type : types) { - switch (type) { - case cpu: - result.put(ResourceType.cpu, new CpuCore(dir)); - break; - default: - break; - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java deleted file mode 100644 index a3d830a..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupOperation.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import java.io.IOException; -import java.util.List; -import java.util.Set; - -public interface CgroupOperation { - - public List<Hierarchy> getHierarchies(); - - public Set<CGroupResource> getCGroupResources(); - - public boolean enabled(ResourceType subsystem); - - public Hierarchy busy(ResourceType subsystem); - - public Hierarchy mounted(Hierarchy hierarchy); - - public void mount(Hierarchy hierarchy) throws IOException; - - public void umount(Hierarchy hierarchy) throws IOException; - - public void create(CgroupCommon cgroup) throws SecurityException; - - public void delete(CgroupCommon cgroup) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java deleted file mode 100644 index 0a7f97c..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/CgroupUtils.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -import io.gearpump.cluster.utils.SystemOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public class CgroupUtils { - - public static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class); - - public static void deleteDir(String dir) { - try { - String cmd = "rmdir " + dir; - SystemOperation.exec(cmd); - } catch (IOException e) { - // TODO Auto-generated catch block - LOG.error("rm " + dir + " fail!", e); - } - } - - public static boolean fileExists(String dir) { - File file = new File(dir); - return file.exists(); - } - - public static boolean dirExists(String dir) { - File file = new File(dir); - return file.isDirectory(); - } - - public static Set<ResourceType> analyse(String str) { - Set<ResourceType> result = new HashSet<ResourceType>(); - String[] subSystems = str.split(","); - for (String subSystem : subSystems) { - ResourceType type = ResourceType.getResourceType(subSystem); - if (type != null) - result.add(type); - } - return result; - } - - public static String reAnalyse(Set<ResourceType> subSystems) { - StringBuilder sb = new StringBuilder(); - if (subSystems.size() == 0) - return sb.toString(); - for (ResourceType type : subSystems) { - sb.append(type.name()).append(","); - } - return sb.toString().substring(0, sb.length() - 1); - } - - public static boolean enabled() { - return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE); - } - - public static List<String> readFileByLine(String fileDir) throws IOException { - List<String> result = new ArrayList<String>(); - FileReader fileReader = null; - BufferedReader reader = null; - try { - File file = new File(fileDir); - fileReader = new FileReader(file); - reader = new BufferedReader(fileReader); - String tempString = null; - while ((tempString = reader.readLine()) != null) { - result.add(tempString); - } - } finally { - CgroupUtils.close(fileReader, reader); - } - return result; - } - - public static void writeFileByLine(String fileDir, List<String> strings) throws IOException { - FileWriter writer = null; - BufferedWriter bw = null; - try { - File file = new File(fileDir); - if (!file.exists()) { - LOG.error(fileDir + " is no existed"); - return; - } - writer = new FileWriter(file, true); - bw = new BufferedWriter(writer); - for (String string : strings) { - bw.write(string); - bw.newLine(); - bw.flush(); - } - } finally { - CgroupUtils.close(writer, bw); - } - } - - public static void writeFileByLine(String fileDir, String string) throws IOException { - FileWriter writer = null; - BufferedWriter bw = null; - try { - File file = new File(fileDir); - if (!file.exists()) { - LOG.error(fileDir + " is no existed"); - return; - } - writer = new FileWriter(file, true); - bw = new BufferedWriter(writer); - bw.write(string); - bw.newLine(); - bw.flush(); - } finally { - CgroupUtils.close(writer, bw); - } - } - - public static void close(FileReader reader, BufferedReader br) { - try { - if (reader != null) - reader.close(); - if (br != null) - br.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - - } - } - - public static void close(FileWriter writer, BufferedWriter bw) { - try { - if (writer != null) - writer.close(); - if (bw != null) - bw.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java ---------------------------------------------------------------------- diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java deleted file mode 100644 index 80e12be..0000000 --- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Constants.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.cgroup; - -public class Constants { - - public static final String CGROUP_STATUS_FILE = "/proc/cgroups"; - - public static final String MOUNT_STATUS_FILE = "/proc/mounts"; - - public static String getDir(String dir, String constant) { - return dir + constant; - } -}
