http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala new file mode 100644 index 0000000..5c75904 --- /dev/null +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import org.apache.hadoop.conf.Configuration +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.ShufflePartitioner +import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +object SequenceFileIO extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "source" -> CLIOption[Int]("<sequence file reader number>", required = false, + defaultValue = Some(1)), + "sink" -> CLIOption[Int]("<sequence file writer number>", required = false, + defaultValue = Some(1)), + "input" -> CLIOption[String]("<input file path>", required = true), + "output" -> CLIOption[String]("<output file directory>", required = true) + ) + + def application(config: ParseResult): StreamApplication = { + val spoutNum = config.getInt("source") + val boltNum = config.getInt("sink") + val input = config.getString("input") + val output = config.getString("output") + val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input) + .withString(SeqFileStreamProcessor.OUTPUT_PATH, output) + val hadoopConfig = appConfig.withHadoopConf(new Configuration()) + val partitioner = new ShufflePartitioner() + val streamProducer = Processor[SeqFileStreamProducer](spoutNum) + val streamProcessor = Processor[SeqFileStreamProcessor](boltNum) + + val app = StreamApplication("SequenceFileIO", + Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config)) + context.close() + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala deleted file mode 100644 index e5dbe0b..0000000 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import org.apache.hadoop.conf.Configuration -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.cluster.UserConfig - -class HadoopConfigSpec extends WordSpec with Matchers { - - "HadoopConfig" should { - "serialize and deserialze hadoop configuration properly" in { - val hadoopConf = new Configuration() - val key = "test_key" - val value = "test_value" - hadoopConf.set(key, value) - - val user = UserConfig.empty - - import io.gearpump.streaming.examples.fsio.HadoopConfig._ - assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala deleted file mode 100644 index bb0d26b..0000000 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import java.io.File -import scala.collection.mutable.ArrayBuffer - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.SequenceFile.Reader -import org.apache.hadoop.io.{SequenceFile, Text} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.{TestUtil, UserConfig} -import io.gearpump.streaming.task.{StartTime, TaskId} -import io.gearpump.streaming.{MockUtil, Processor} -class SeqFileStreamProcessorSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { - - val kvPairs = new ArrayBuffer[(String, String)] - val outputDirectory = "SeqFileStreamProcessor_Test" - val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 0)) - val hadoopConf = new Configuration() - val fs = FileSystem.get(hadoopConf) - val textClass = new Text().getClass - val _key = new Text() - val _value = new Text() - - val kvGenerator = for { - key <- Gen.alphaStr - value <- Gen.alphaStr - } yield (key, value) - - before { - implicit val system1 = ActorSystem("SeqFileStreamProcessor", TestUtil.DEFAULT_CONFIG) - val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG) - val watcher = TestProbe()(system1) - val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH, - outputDirectory)).withHadoopConf(new Configuration()) - val context = MockUtil.mockTaskContext - - val processorDescription = - Processor.ProcessorToProcessorDescription(id = 0, Processor[SeqFileStreamProcessor](1)) - - val taskId = TaskId(0, 0) - when(context.taskId).thenReturn(taskId) - - val processor = new SeqFileStreamProcessor(context, conf) - processor.onStart(StartTime(0)) - - forAll(kvGenerator) { kv => - val (key, value) = kv - kvPairs.append((key, value)) - processor.onNext(Message(key + "++" + value)) - } - processor.onStop() - } - - property("SeqFileStreamProcessor should write the key-value pairs to a sequence file") { - val reader = new SequenceFile.Reader(hadoopConf, Reader.file(sequenceFilePath)) - kvPairs.foreach { kv => - val (key, value) = kv - if (value.length > 0 && reader.next(_key, _value)) { - assert(_key.toString == key && _value.toString == value) - } - } - reader.close() - } - - after { - fs.deleteOnExit(new Path(outputDirectory)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala deleted file mode 100644 index 04dafa7..0000000 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.fsio - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.SequenceFile.Writer -import org.apache.hadoop.io.{SequenceFile, Text} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.MockUtil._ -import io.gearpump.streaming.task.StartTime - -class SeqFileStreamProducerSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { - - val kvPairs = new ArrayBuffer[(String, String)] - val inputFile = "SeqFileStreamProducer_Test" - val sequenceFilePath = new Path(inputFile) - val hadoopConf = new Configuration() - val fs = FileSystem.get(hadoopConf) - val textClass = new Text().getClass - val _key = new Text() - val _value = new Text() - - val kvGenerator = for { - key <- Gen.alphaStr - value <- Gen.alphaStr - } yield (key, value) - - before { - fs.deleteOnExit(sequenceFilePath) - val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath), - Writer.keyClass(textClass), Writer.valueClass(textClass)) - forAll(kvGenerator) { kv => - _key.set(kv._1) - _value.set(kv._2) - kvPairs.append((kv._1, kv._2)) - writer.append(_key, _value) - } - writer.close() - } - - property("SeqFileStreamProducer should read the key-value pairs from " + - "a sequence file and deliver them") { - - val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, - inputFile)).withHadoopConf(new Configuration()) - - val context = MockUtil.mockTaskContext - - val producer = new SeqFileStreamProducer(context, conf) - producer.onStart(StartTime(0)) - producer.onNext(Message("start")) - - val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet - verify(context).output(argMatch[Message](msg => - expected.contains(msg.msg.asInstanceOf[String]))) - } - - after { - fs.deleteOnExit(sequenceFilePath) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala deleted file mode 100644 index efb5e44..0000000 --- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.fsio - -import scala.concurrent.Future -import scala.util.{Success, Try} - -import com.typesafe.config.Config -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class SequenceFileIOSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { - - override def beforeAll { - startActorSystem() - } - - override def afterAll { - shutdownActorSystem() - } - - override def config: Config = TestUtil.DEFAULT_CONFIG - - property("SequenceFileIO should succeed to submit application with required arguments") { - val requiredArgs = Array( - "-input", "/tmp/input", - "-output", "/tmp/output" - ) - val optionalArgs = Array( - "-source", "1", - "-sink", "1" - ) - val validArgs = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - - Future { - SequenceFileIO.main(masterConfig, args) - } - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - - val invalidArgs = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs.take(0), optionalArgs), - (requiredArgs.take(2), optionalArgs) - ) - } - forAll(invalidArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = optionalArgs - assert(Try(SequenceFileIO.main(args)).isFailure, "missing required arguments, print usage") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala new file mode 100644 index 0000000..cf1bea8 --- /dev/null +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import org.apache.hadoop.conf.Configuration +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.cluster.UserConfig + +class HadoopConfigSpec extends WordSpec with Matchers { + + "HadoopConfig" should { + "serialize and deserialze hadoop configuration properly" in { + val hadoopConf = new Configuration() + val key = "test_key" + val value = "test_value" + hadoopConf.set(key, value) + + val user = UserConfig.empty + + import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ + assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala new file mode 100644 index 0000000..7831b14 --- /dev/null +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import java.io.File +import scala.collection.mutable.ArrayBuffer + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.SequenceFile.Reader +import org.apache.hadoop.io.{SequenceFile, Text} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.task.{StartTime, TaskId} +import org.apache.gearpump.streaming.{MockUtil, Processor} +class SeqFileStreamProcessorSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { + + val kvPairs = new ArrayBuffer[(String, String)] + val outputDirectory = "SeqFileStreamProcessor_Test" + val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 0)) + val hadoopConf = new Configuration() + val fs = FileSystem.get(hadoopConf) + val textClass = new Text().getClass + val _key = new Text() + val _value = new Text() + + val kvGenerator = for { + key <- Gen.alphaStr + value <- Gen.alphaStr + } yield (key, value) + + before { + implicit val system1 = ActorSystem("SeqFileStreamProcessor", TestUtil.DEFAULT_CONFIG) + val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG) + val watcher = TestProbe()(system1) + val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH, + outputDirectory)).withHadoopConf(new Configuration()) + val context = MockUtil.mockTaskContext + + val processorDescription = + Processor.ProcessorToProcessorDescription(id = 0, Processor[SeqFileStreamProcessor](1)) + + val taskId = TaskId(0, 0) + when(context.taskId).thenReturn(taskId) + + val processor = new SeqFileStreamProcessor(context, conf) + processor.onStart(StartTime(0)) + + forAll(kvGenerator) { kv => + val (key, value) = kv + kvPairs.append((key, value)) + processor.onNext(Message(key + "++" + value)) + } + processor.onStop() + } + + property("SeqFileStreamProcessor should write the key-value pairs to a sequence file") { + val reader = new SequenceFile.Reader(hadoopConf, Reader.file(sequenceFilePath)) + kvPairs.foreach { kv => + val (key, value) = kv + if (value.length > 0 && reader.next(_key, _value)) { + assert(_key.toString == key && _value.toString == value) + } + } + reader.close() + } + + after { + fs.deleteOnExit(new Path(outputDirectory)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala new file mode 100644 index 0000000..ad27e63 --- /dev/null +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.fsio + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.SequenceFile.Writer +import org.apache.hadoop.io.{SequenceFile, Text} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.MockUtil._ +import org.apache.gearpump.streaming.task.StartTime + +class SeqFileStreamProducerSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { + + val kvPairs = new ArrayBuffer[(String, String)] + val inputFile = "SeqFileStreamProducer_Test" + val sequenceFilePath = new Path(inputFile) + val hadoopConf = new Configuration() + val fs = FileSystem.get(hadoopConf) + val textClass = new Text().getClass + val _key = new Text() + val _value = new Text() + + val kvGenerator = for { + key <- Gen.alphaStr + value <- Gen.alphaStr + } yield (key, value) + + before { + fs.deleteOnExit(sequenceFilePath) + val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath), + Writer.keyClass(textClass), Writer.valueClass(textClass)) + forAll(kvGenerator) { kv => + _key.set(kv._1) + _value.set(kv._2) + kvPairs.append((kv._1, kv._2)) + writer.append(_key, _value) + } + writer.close() + } + + property("SeqFileStreamProducer should read the key-value pairs from " + + "a sequence file and deliver them") { + + val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, + inputFile)).withHadoopConf(new Configuration()) + + val context = MockUtil.mockTaskContext + + val producer = new SeqFileStreamProducer(context, conf) + producer.onStart(StartTime(0)) + producer.onNext(Message("start")) + + val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet + verify(context).output(argMatch[Message](msg => + expected.contains(msg.msg.asInstanceOf[String]))) + } + + after { + fs.deleteOnExit(sequenceFilePath) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala new file mode 100644 index 0000000..ba65802 --- /dev/null +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.fsio + +import scala.concurrent.Future +import scala.util.{Success, Try} + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class SequenceFileIOSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { + + override def beforeAll { + startActorSystem() + } + + override def afterAll { + shutdownActorSystem() + } + + override def config: Config = TestUtil.DEFAULT_CONFIG + + property("SequenceFileIO should succeed to submit application with required arguments") { + val requiredArgs = Array( + "-input", "/tmp/input", + "-output", "/tmp/output" + ) + val optionalArgs = Array( + "-source", "1", + "-sink", "1" + ) + val validArgs = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs) + ) + } + val masterReceiver = createMockMaster() + forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + + Future { + SequenceFileIO.main(masterConfig, args) + } + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + + val invalidArgs = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs.take(0), optionalArgs), + (requiredArgs.take(2), optionalArgs) + ) + } + forAll(invalidArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = optionalArgs + assert(Try(SequenceFileIO.main(args)).isFailure, "missing required arguments, print usage") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/README.md b/examples/streaming/kafka/README.md index bac7101..587271c 100644 --- a/examples/streaming/kafka/README.md +++ b/examples/streaming/kafka/README.md @@ -81,7 +81,7 @@ Change directory into gearpump root, build gearpump with `sbt pack` and launch a Finally, let's run the KafkaWordCount example. ```bash - ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar io.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount + ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount ``` One more step is to verify that we've succeeded in producing data to Kafka. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala deleted file mode 100644 index 35b6594..0000000 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.kafka - -import akka.actor.ActorSystem -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.ShufflePartitioner -import io.gearpump.streaming.StreamApplication -import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.sink.DataSinkProcessor -import io.gearpump.streaming.source.DataSourceProcessor -import io.gearpump.util.Graph._ -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -object KafkaReadWrite extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, - defaultValue = Some(1)), - "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, - defaultValue = Some(1)), - "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, - defaultValue = Some("localhost:2181")), - "brokerList" -> CLIOption[String]("<broker server list string>", required = false, - defaultValue = Some("localhost:9092")), - "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, - defaultValue = Some("topic1")), - "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, - defaultValue = Some("topic2")) - ) - - def application(config: ParseResult, system: ActorSystem): StreamApplication = { - implicit val actorSystem = system - val sourceNum = config.getInt("source") - val sinkNum = config.getInt("sink") - val zookeeperConnect = config.getString("zookeeperConnect") - val brokerList = config.getString("brokerList") - val sourceTopic = config.getString("sourceTopic") - val sinkTopic = config.getString("sinkTopic") - - val appConfig = UserConfig.empty - val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) - val source = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) - val sourceProcessor = DataSourceProcessor(source, sourceNum) - val sink = new KafkaSink(sinkTopic, brokerList) - val sinkProcessor = DataSinkProcessor(sink, sinkNum) - val partitioner = new ShufflePartitioner - val computation = sourceProcessor ~ partitioner ~> sinkProcessor - val app = StreamApplication("KafkaReadWrite", Graph(computation), appConfig) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - val appId = context.submit(application(config, context.system)) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala deleted file mode 100644 index 6955bcc..0000000 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.kafka.wordcount - -import akka.actor.ActorSystem -import kafka.api.OffsetRequest -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.kafka.lib.KafkaSourceConfig -import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.sink.DataSinkProcessor -import io.gearpump.streaming.source.DataSourceProcessor -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph._ -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -object KafkaWordCount extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, - defaultValue = Some(1)), - "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), - "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), - "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, - defaultValue = Some(1)) - ) - - def application(config: ParseResult, system: ActorSystem): StreamApplication = { - implicit val actorSystem = system - val sourceNum = config.getInt("source") - val splitNum = config.getInt("split") - val sumNum = config.getInt("sum") - val sinkNum = config.getInt("sink") - - val appConfig = UserConfig.empty - val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", "localhost:9092") - val kafkaSourceConfig = new KafkaSourceConfig() - .withConsumerTopics("topic1").withConsumerStartOffset(OffsetRequest.LatestTime) - val source = new KafkaSource(kafkaSourceConfig, offsetStorageFactory) - val sourceProcessor = DataSourceProcessor(source, sourceNum) - val split = Processor[Split](splitNum) - val sum = Processor[Sum](sumNum) - val sink = new KafkaSink("topic2", "localhost:9092") - val sinkProcessor = DataSinkProcessor(sink, sinkNum) - val partitioner = new HashPartitioner - val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> - sum ~ partitioner ~> sinkProcessor - val app = StreamApplication("KafkaWordCount", Graph(computation), appConfig) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - val appId = context.submit(application(config, context.system)) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala deleted file mode 100644 index b46d170..0000000 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.kafka.wordcount - -import com.twitter.bijection.Injection - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.output - - override def onStart(startTime: StartTime): Unit = { - } - - override def onNext(msg: Message): Unit = { - Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]) - .foreach(_.split("\\s+").foreach( - word => output(new Message(word, msg.timestamp)))) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala deleted file mode 100644 index 9c67733..0000000 --- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.kafka.wordcount - -import com.twitter.bijection.Injection - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import taskContext.output - - private[wordcount] var wordcount = Map.empty[String, Long] - - override def onStart(startTime: StartTime): Unit = {} - - override def onNext(message: Message): Unit = { - val word = message.msg.asInstanceOf[String] - val count = wordcount.getOrElse(word, 0L) + 1 - wordcount += word -> count - output(new Message( - Injection[String, Array[Byte]](word) -> - Injection[Long, Array[Byte]](count), - message.timestamp)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala new file mode 100644 index 0000000..364544b --- /dev/null +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.kafka + +import akka.actor.ActorSystem +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.ShufflePartitioner +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.streaming.source.DataSourceProcessor +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +object KafkaReadWrite extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, + defaultValue = Some(1)), + "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, + defaultValue = Some(1)), + "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, + defaultValue = Some("localhost:2181")), + "brokerList" -> CLIOption[String]("<broker server list string>", required = false, + defaultValue = Some("localhost:9092")), + "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, + defaultValue = Some("topic1")), + "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, + defaultValue = Some("topic2")) + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + val sourceNum = config.getInt("source") + val sinkNum = config.getInt("sink") + val zookeeperConnect = config.getString("zookeeperConnect") + val brokerList = config.getString("brokerList") + val sourceTopic = config.getString("sourceTopic") + val sinkTopic = config.getString("sinkTopic") + + val appConfig = UserConfig.empty + val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) + val source = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) + val sourceProcessor = DataSourceProcessor(source, sourceNum) + val sink = new KafkaSink(sinkTopic, brokerList) + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + val partitioner = new ShufflePartitioner + val computation = sourceProcessor ~ partitioner ~> sinkProcessor + val app = StreamApplication("KafkaReadWrite", Graph(computation), appConfig) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config, context.system)) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala new file mode 100644 index 0000000..5ef1e67 --- /dev/null +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.kafka.wordcount + +import akka.actor.ActorSystem +import kafka.api.OffsetRequest +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig +import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.streaming.source.DataSourceProcessor +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +object KafkaWordCount extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, + defaultValue = Some(1)), + "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), + "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), + "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, + defaultValue = Some(1)) + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + val sourceNum = config.getInt("source") + val splitNum = config.getInt("split") + val sumNum = config.getInt("sum") + val sinkNum = config.getInt("sink") + + val appConfig = UserConfig.empty + val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", "localhost:9092") + val kafkaSourceConfig = new KafkaSourceConfig() + .withConsumerTopics("topic1").withConsumerStartOffset(OffsetRequest.LatestTime) + val source = new KafkaSource(kafkaSourceConfig, offsetStorageFactory) + val sourceProcessor = DataSourceProcessor(source, sourceNum) + val split = Processor[Split](splitNum) + val sum = Processor[Sum](sumNum) + val sink = new KafkaSink("topic2", "localhost:9092") + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + val partitioner = new HashPartitioner + val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> + sum ~ partitioner ~> sinkProcessor + val app = StreamApplication("KafkaWordCount", Graph(computation), appConfig) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config, context.system)) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala new file mode 100644 index 0000000..a95f596 --- /dev/null +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.kafka.wordcount + +import com.twitter.bijection.Injection + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.output + + override def onStart(startTime: StartTime): Unit = { + } + + override def onNext(msg: Message): Unit = { + Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]) + .foreach(_.split("\\s+").foreach( + word => output(new Message(word, msg.timestamp)))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala new file mode 100644 index 0000000..9930b92 --- /dev/null +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.kafka.wordcount + +import com.twitter.bijection.Injection + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import taskContext.output + + private[wordcount] var wordcount = Map.empty[String, Long] + + override def onStart(startTime: StartTime): Unit = {} + + override def onNext(message: Message): Unit = { + val word = message.msg.asInstanceOf[String] + val count = wordcount.getOrElse(word, 0L) + 1 + wordcount += word -> count + output(new Message( + Injection[String, Array[Byte]](word) -> + Injection[Long, Array[Byte]](count), + message.timestamp)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala deleted file mode 100644 index 35f7a62..0000000 --- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.kafka.wordcount - -import scala.concurrent.Future -import scala.util.Success - -import com.typesafe.config.Config -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class KafkaWordCountSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - override def config: Config = TestUtil.DEFAULT_CONFIG - - property("KafkaWordCount should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - val optionalArgs = Array( - "-source", "1", - "-split", "1", - "-sum", "1", - "-sink", "1") - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - - Future { - KafkaWordCount.main(masterConfig, args) - } - - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala deleted file mode 100644 index 2cc6a16..0000000 --- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.kafka.wordcount - -import com.twitter.bijection.Injection -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.TaskContext - -class SplitSpec extends FlatSpec with Matchers with MockitoSugar { - - it should "split should split the text and deliver to next task" in { - val taskContext = mock[TaskContext] - val split = new Split(taskContext, UserConfig.empty) - - val msg = "this is a test message" - split.onNext(Message(Injection[String, Array[Byte]](msg))) - verify(taskContext, times(msg.split(" ").length)).output(anyObject[Message]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala deleted file mode 100644 index 4dcb9d7..0000000 --- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.kafka.wordcount - -import scala.collection.mutable - -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class SumSpec extends FlatSpec with Matchers { - - it should "sum should calculate the frequency of the word correctly" in { - - val stringGenerator = Gen.alphaStr - val expectedWordCountMap: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() - - val taskContext = MockUtil.mockTaskContext - - val sum = new Sum(taskContext, UserConfig.empty) - sum.onStart(StartTime(0)) - val str = "once two two three three three" - - var totalWordCount = 0 - stringGenerator.map { word => - totalWordCount += 1 - expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) + 1) - sum.onNext(Message(word)) - } - verify(taskContext, times(totalWordCount)).output(anyObject[Message]) - - expectedWordCountMap.foreach { wordCount => - val (word, count) = wordCount - assert(count == sum.wordcount.get(word).get) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala new file mode 100644 index 0000000..c4c422d --- /dev/null +++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.kafka.wordcount + +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class KafkaWordCountSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + override def config: Config = TestUtil.DEFAULT_CONFIG + + property("KafkaWordCount should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + val optionalArgs = Array( + "-source", "1", + "-split", "1", + "-sum", "1", + "-sink", "1") + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs) + ) + } + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + + Future { + KafkaWordCount.main(masterConfig, args) + } + + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala new file mode 100644 index 0000000..893d06d --- /dev/null +++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.kafka.wordcount + +import com.twitter.bijection.Injection +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext + +class SplitSpec extends FlatSpec with Matchers with MockitoSugar { + + it should "split should split the text and deliver to next task" in { + val taskContext = mock[TaskContext] + val split = new Split(taskContext, UserConfig.empty) + + val msg = "this is a test message" + split.onNext(Message(Injection[String, Array[Byte]](msg))) + verify(taskContext, times(msg.split(" ").length)).output(anyObject[Message]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala new file mode 100644 index 0000000..3538ece --- /dev/null +++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.kafka.wordcount + +import scala.collection.mutable + +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class SumSpec extends FlatSpec with Matchers { + + it should "sum should calculate the frequency of the word correctly" in { + + val stringGenerator = Gen.alphaStr + val expectedWordCountMap: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]() + + val taskContext = MockUtil.mockTaskContext + + val sum = new Sum(taskContext, UserConfig.empty) + sum.onStart(StartTime(0)) + val str = "once two two three three three" + + var totalWordCount = 0 + stringGenerator.map { word => + totalWordCount += 1 + expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) + 1) + sum.onNext(Message(word)) + } + verify(taskContext, times(totalWordCount)).output(anyObject[Message]) + + expectedWordCountMap.foreach { wordCount => + val (word, count) = wordCount + assert(count == sum.wordcount.get(word).get) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/README.md b/examples/streaming/sol/README.md index a8b10b3..d9c8eab 100644 --- a/examples/streaming/sol/README.md +++ b/examples/streaming/sol/README.md @@ -14,5 +14,5 @@ The original code comes from: https://github.com/yahoo/storm-perf-test <B>Example:</B> ``` -bin/gear app -jar examples/gearpump-examples-assembly-$VERSION.jar io.gearpump.streaming.examples.sol.SOL +bin/gear app -jar examples/gearpump-examples-assembly-$VERSION.jar org.apache.gearpump.streaming.examples.sol.SOL ``` http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala deleted file mode 100644 index 10c190c..0000000 --- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.sol - -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.ShufflePartitioner -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph._ -import io.gearpump.util.{AkkaApp, Graph, LogUtil} - -object SOL extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "streamProducer" -> CLIOption[Int]("<stream producer number>", required = false, - defaultValue = Some(1)), - "streamProcessor" -> CLIOption[Int]("<stream processor number>", required = false, - defaultValue = Some(1)), - "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false, - defaultValue = Some(100)), - "stages" -> CLIOption[Int]("<how many stages to run>", required = false, - defaultValue = Some(2))) - - def application(config: ParseResult): StreamApplication = { - val spoutNum = config.getInt("streamProducer") - val boltNum = config.getInt("streamProcessor") - val bytesPerMessage = config.getInt("bytesPerMessage") - val stages = config.getInt("stages") - val appConfig = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, bytesPerMessage) - val partitioner = new ShufflePartitioner() - val streamProducer = Processor[SOLStreamProducer](spoutNum) - val streamProcessor = Processor[SOLStreamProcessor](boltNum) - var computation = streamProducer ~ partitioner ~> streamProcessor - computation = 0.until(stages - 2).foldLeft(computation) { (c, id) => - c ~ partitioner ~> streamProcessor.copy() - } - val dag = Graph(computation) - val app = StreamApplication("sol", dag, appConfig) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - val appId = context.submit(application(config)) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala deleted file mode 100644 index de1054f..0000000 --- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.sol - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration - -import akka.actor.Cancellable - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - import taskContext.output - - val taskConf = taskContext - - private var msgCount: Long = 0 - private var scheduler: Cancellable = null - private var snapShotWordCount: Long = 0 - private var snapShotTime: Long = 0 - - override def onStart(startTime: StartTime): Unit = { - scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), - new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount()) - snapShotTime = System.currentTimeMillis() - } - - override def onNext(msg: Message): Unit = { - output(msg) - msgCount = msgCount + 1 - } - - override def onStop(): Unit = { - if (scheduler != null) { - scheduler.cancel() - } - } - - def reportWordCount(): Unit = { - val current: Long = System.currentTimeMillis() - LOG.info(s"Task ${taskConf.taskId} " + - s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) / 1000)} " + - s"(words, second)") - snapShotWordCount = msgCount - snapShotTime = current - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala deleted file mode 100644 index 5c0f3be..0000000 --- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.sol - -import java.util.Random - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.examples.sol.SOLStreamProducer._ -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - - import taskContext.output - - private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE) - .getOrElse(DEFAULT_MESSAGE_SIZE) - private var messages: Array[String] = null - private var rand: Random = null - private var messageCount: Long = 0 - - override def onStart(startTime: StartTime): Unit = { - prepareRandomMessage - self ! Start - } - - private def prepareRandomMessage = { - rand = new Random() - val differentMessages = 100 - messages = new Array(differentMessages) - - 0.until(differentMessages).map { index => - val sb = new StringBuilder(sizeInBytes) - // Even though java encodes strings in UCS2, the serialized version sent by the tuples - // is UTF8, so it should be a single byte - 0.until(sizeInBytes).foldLeft(sb) { (sb, j) => - sb.append(rand.nextInt(9)) - } - messages(index) = sb.toString() - } - } - - override def onNext(msg: Message): Unit = { - val message = messages(rand.nextInt(messages.length)) - output(new Message(message, System.currentTimeMillis())) - messageCount = messageCount + 1L - self ! messageSourceMinClock - } - - // messageSourceMinClock represent the min clock of the message source - private def messageSourceMinClock: Message = { - Message("tick", System.currentTimeMillis()) - } -} - -object SOLStreamProducer { - val DEFAULT_MESSAGE_SIZE = 100 - // Bytes - val BYTES_PER_MESSAGE = "bytesPerMessage" - val Start = Message("start") -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala new file mode 100644 index 0000000..fb80ad3 --- /dev/null +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.sol + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.ShufflePartitioner +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph._ +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} + +object SOL extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "streamProducer" -> CLIOption[Int]("<stream producer number>", required = false, + defaultValue = Some(1)), + "streamProcessor" -> CLIOption[Int]("<stream processor number>", required = false, + defaultValue = Some(1)), + "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false, + defaultValue = Some(100)), + "stages" -> CLIOption[Int]("<how many stages to run>", required = false, + defaultValue = Some(2))) + + def application(config: ParseResult): StreamApplication = { + val spoutNum = config.getInt("streamProducer") + val boltNum = config.getInt("streamProcessor") + val bytesPerMessage = config.getInt("bytesPerMessage") + val stages = config.getInt("stages") + val appConfig = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, bytesPerMessage) + val partitioner = new ShufflePartitioner() + val streamProducer = Processor[SOLStreamProducer](spoutNum) + val streamProcessor = Processor[SOLStreamProcessor](boltNum) + var computation = streamProducer ~ partitioner ~> streamProcessor + computation = 0.until(stages - 2).foldLeft(computation) { (c, id) => + c ~ partitioner ~> streamProcessor.copy() + } + val dag = Graph(computation) + val app = StreamApplication("sol", dag, appConfig) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config)) + context.close() + } +}
