http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala new file mode 100644 index 0000000..796b0d2 --- /dev/null +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.sol + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +import akka.actor.Cancellable + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + import taskContext.output + + val taskConf = taskContext + + private var msgCount: Long = 0 + private var scheduler: Cancellable = null + private var snapShotWordCount: Long = 0 + private var snapShotTime: Long = 0 + + override def onStart(startTime: StartTime): Unit = { + scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), + new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount()) + snapShotTime = System.currentTimeMillis() + } + + override def onNext(msg: Message): Unit = { + output(msg) + msgCount = msgCount + 1 + } + + override def onStop(): Unit = { + if (scheduler != null) { + scheduler.cancel() + } + } + + def reportWordCount(): Unit = { + val current: Long = System.currentTimeMillis() + LOG.info(s"Task ${taskConf.taskId} " + + s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) / 1000)} " + + s"(words, second)") + snapShotWordCount = msgCount + snapShotTime = current + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala new file mode 100644 index 0000000..84ed038 --- /dev/null +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.sol + +import java.util.Random + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._ +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext.output + + private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE) + .getOrElse(DEFAULT_MESSAGE_SIZE) + private var messages: Array[String] = null + private var rand: Random = null + private var messageCount: Long = 0 + + override def onStart(startTime: StartTime): Unit = { + prepareRandomMessage + self ! Start + } + + private def prepareRandomMessage = { + rand = new Random() + val differentMessages = 100 + messages = new Array(differentMessages) + + 0.until(differentMessages).map { index => + val sb = new StringBuilder(sizeInBytes) + // Even though java encodes strings in UCS2, the serialized version sent by the tuples + // is UTF8, so it should be a single byte + 0.until(sizeInBytes).foldLeft(sb) { (sb, j) => + sb.append(rand.nextInt(9)) + } + messages(index) = sb.toString() + } + } + + override def onNext(msg: Message): Unit = { + val message = messages(rand.nextInt(messages.length)) + output(new Message(message, System.currentTimeMillis())) + messageCount = messageCount + 1L + self ! messageSourceMinClock + } + + // messageSourceMinClock represent the min clock of the message source + private def messageSourceMinClock: Message = { + Message("tick", System.currentTimeMillis()) + } +} + +object SOLStreamProducer { + val DEFAULT_MESSAGE_SIZE = 100 + // Bytes + val BYTES_PER_MESSAGE = "bytesPerMessage" + val Start = Message("start") +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala deleted file mode 100644 index 6e266d0..0000000 --- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.sol - -import scala.concurrent.Future -import scala.util.Success - -import com.typesafe.config.Config -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class SOLSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { - override def beforeAll { - startActorSystem() - } - - override def afterAll { - shutdownActorSystem() - } - - override def config: Config = TestUtil.DEFAULT_CONFIG - - property("SOL should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - val optionalArgs = Array( - "-streamProducer", "1", - "-streamProcessor", "1", - "-bytesPerMessage", "100", - "-stages", "10") - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - - Future { - SOL.main(masterConfig, args) - } - - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala deleted file mode 100644 index f5035b4..0000000 --- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.sol - -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.{FlatSpec, Matchers} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class SOLStreamProcessorSpec extends FlatSpec with Matchers { - - it should "pass the message downstream" in { - val stringGenerator = Gen.alphaStr - val context = MockUtil.mockTaskContext - - val sol = new SOLStreamProcessor(context, UserConfig.empty) - sol.onStart(StartTime(0)) - val msg = Message("msg") - sol.onNext(msg) - verify(context, times(1)).output(msg) - sol.onStop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala deleted file mode 100644 index 4bac30c..0000000 --- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.examples.sol - -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class SOLStreamProducerSpec extends WordSpec with Matchers { - - "SOLStreamProducer" should { - "producer message continuously" in { - - val conf = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, 100) - val context = MockUtil.mockTaskContext - - val producer = new SOLStreamProducer(context, conf) - producer.onStart(StartTime(0)) - producer.onNext(Message("msg")) - verify(context).output(any[Message]) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala new file mode 100644 index 0000000..29e8284 --- /dev/null +++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLSpec.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.sol + +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class SOLSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { + override def beforeAll { + startActorSystem() + } + + override def afterAll { + shutdownActorSystem() + } + + override def config: Config = TestUtil.DEFAULT_CONFIG + + property("SOL should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + val optionalArgs = Array( + "-streamProducer", "1", + "-streamProcessor", "1", + "-bytesPerMessage", "100", + "-stages", "10") + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs) + ) + } + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + + Future { + SOL.main(masterConfig, args) + } + + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala new file mode 100644 index 0000000..a6cc966 --- /dev/null +++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.sol + +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{FlatSpec, Matchers} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class SOLStreamProcessorSpec extends FlatSpec with Matchers { + + it should "pass the message downstream" in { + val stringGenerator = Gen.alphaStr + val context = MockUtil.mockTaskContext + + val sol = new SOLStreamProcessor(context, UserConfig.empty) + sol.onStart(StartTime(0)) + val msg = Message("msg") + sol.onNext(msg) + verify(context, times(1)).output(msg) + sol.onStop() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala new file mode 100644 index 0000000..2316de8 --- /dev/null +++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.sol + +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class SOLStreamProducerSpec extends WordSpec with Matchers { + + "SOLStreamProducer" should { + "producer message continuously" in { + + val conf = UserConfig.empty.withInt(SOLStreamProducer.BYTES_PER_MESSAGE, 100) + val context = MockUtil.mockTaskContext + + val producer = new SOLStreamProducer(context, conf) + producer.onStart(StartTime(0)) + producer.onNext(Message("msg")) + verify(context).output(any[Message]) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/resources/state.conf ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/resources/state.conf b/examples/streaming/state/src/main/resources/state.conf index 7e26d4b..a7eec2b 100644 --- a/examples/streaming/state/src/main/resources/state.conf +++ b/examples/streaming/state/src/main/resources/state.conf @@ -1,6 +1,6 @@ state { checkpoint { interval = 1000 # milliseconds - store.factory = io.gearpump.streaming.kafka.KafkaCheckpointStoreFactory + store.factory = org.apache.gearpump.streaming.kafka.KafkaCheckpointStoreFactory } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala deleted file mode 100644 index 3c6cde6..0000000 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state - -import akka.actor.ActorSystem -import org.apache.hadoop.conf.Configuration - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.examples.state.processor.CountProcessor -import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory -import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation -import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.sink.DataSinkProcessor -import io.gearpump.streaming.source.DataSourceProcessor -import io.gearpump.streaming.state.impl.PersistentStateConfig -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph.Node -import io.gearpump.util.{AkkaApp, Graph} - -/** Does exactly-once message count */ -object MessageCountApp extends AkkaApp with ArgumentsParser { - val SOURCE_TASK = "sourceTask" - val COUNT_TASK = "countTask" - val SINK_TASK = "sinkTask" - val SOURCE_TOPIC = "sourceTopic" - val SINK_TOPIC = "sinkTopic" - val ZOOKEEPER_CONNECT = "zookeeperConnect" - val BROKER_LIST = "brokerList" - val DEFAULT_FS = "defaultFS" - - override val options: Array[(String, CLIOption[Any])] = Array( - SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false, - defaultValue = Some(1)), - COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)), - SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false, - defaultValue = Some(1)), - SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true), - SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true), - ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", - required = true), - BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), - DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>", - required = true) - ) - - def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { - val hadoopConfig = new Configuration - hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS)) - val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, - // Rotates on 1KB - new FileSizeRotation(1000)) - val taskConfig = UserConfig.empty - .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) - .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) - .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) - - val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT) - val brokerList = config.getString(BROKER_LIST) - val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) - val sourceTopic = config.getString(SOURCE_TOPIC) - val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) - val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK)) - val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig) - val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList) - val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK)) - val partitioner = new HashPartitioner() - val graph = Graph(sourceProcessor ~ partitioner - ~> countProcessor ~ partitioner ~> sinkProcessor) - val app = StreamApplication("MessageCount", graph, UserConfig.empty) - app - } - - def main(akkaConf: Config, args: Array[String]): Unit = { - - val config = parse(args) - val context = ClientContext(akkaConf) - implicit val system = context.system - val appId = context.submit(application(config)) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala deleted file mode 100644 index 6f3bc79..0000000 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state - -import akka.actor.ActorSystem -import org.apache.hadoop.conf.Configuration - -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.partitioner.HashPartitioner -import io.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor} -import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory -import io.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig} -import io.gearpump.streaming.{Processor, StreamApplication} -import io.gearpump.util.Graph.Node -import io.gearpump.util.{AkkaApp, Graph} - -/** Does exactly-once sliding window based average aggregation */ -object WindowAverageApp extends AkkaApp with ArgumentsParser { - - override val options: Array[(String, CLIOption[Any])] = Array( - "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)), - "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)), - "window_size" -> CLIOption("<window size in milliseconds>", required = false, - defaultValue = Some(5000)), - "window_step" -> CLIOption("<window step in milliseconds>", required = false, - defaultValue = Some(5000)) - ) - - def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { - val windowSize = config.getInt("window_size") - val windowStep = config.getInt("window_step") - val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration) - val taskConfig = UserConfig.empty. - withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) - .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) - .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) - .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) - val gen = Processor[NumberGeneratorProcessor](config.getInt("gen")) - val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig) - val partitioner = new HashPartitioner() - val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count), - UserConfig.empty) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - - implicit val system = context.system - val appId = context.submit(application(config)) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala deleted file mode 100644 index 6610b91..0000000 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state.processor - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.monoid.AlgebirdMonoid -import io.gearpump.streaming.serializer.ChillSerializer -import io.gearpump.streaming.state.api.{PersistentState, PersistentTask} -import io.gearpump.streaming.state.impl.NonWindowState -import io.gearpump.streaming.task.TaskContext - -class CountProcessor(taskContext: TaskContext, conf: UserConfig) - extends PersistentTask[Int](taskContext, conf) { - - override def persistentState: PersistentState[Int] = { - import com.twitter.algebird.Monoid.intMonoid - new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int]) - } - - override def processMessage(state: PersistentState[Int], message: Message): Unit = { - state.update(message.timestamp, 1) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala deleted file mode 100644 index fa9854f..0000000 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state.processor - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - import taskContext.output - - private var num = 0L - override def onStart(startTime: StartTime): Unit = { - num = startTime.startTime - self ! Message("start") - } - - override def onNext(msg: Message): Unit = { - output(Message(num + "", num)) - num += 1 - - import scala.concurrent.duration._ - taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala deleted file mode 100644 index e7ac9b3..0000000 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state.processor - -import scala.collection.immutable.TreeMap - -import com.twitter.algebird.{AveragedGroup, AveragedValue} -import org.slf4j.Logger - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.monoid.AlgebirdGroup -import io.gearpump.streaming.serializer.ChillSerializer -import io.gearpump.streaming.state.api.{PersistentState, PersistentTask} -import io.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState} -import io.gearpump.streaming.task.TaskContext -import io.gearpump.util.LogUtil - -object WindowAverageProcessor { - val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor]) -} - -class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig) - extends PersistentTask[AveragedValue](taskContext, conf) { - - override def persistentState: PersistentState[AveragedValue] = { - val group = new AlgebirdGroup(AveragedGroup) - val serializer = new ChillSerializer[TreeMap[Interval, AveragedValue]] - val window = new Window(conf.getValue[WindowConfig](WindowConfig.NAME).get) - new WindowState[AveragedValue](group, serializer, taskContext, window) - } - - override def processMessage(state: PersistentState[AveragedValue], - message: Message): Unit = { - val value = AveragedValue(message.msg.asInstanceOf[String].toLong) - state.update(message.timestamp, value) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala new file mode 100644 index 0000000..13bef0d --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state + +import akka.actor.ActorSystem +import org.apache.hadoop.conf.Configuration + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.examples.state.processor.CountProcessor +import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory +import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation +import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.streaming.source.DataSourceProcessor +import org.apache.gearpump.streaming.state.impl.PersistentStateConfig +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph} + +/** Does exactly-once message count */ +object MessageCountApp extends AkkaApp with ArgumentsParser { + val SOURCE_TASK = "sourceTask" + val COUNT_TASK = "countTask" + val SINK_TASK = "sinkTask" + val SOURCE_TOPIC = "sourceTopic" + val SINK_TOPIC = "sinkTopic" + val ZOOKEEPER_CONNECT = "zookeeperConnect" + val BROKER_LIST = "brokerList" + val DEFAULT_FS = "defaultFS" + + override val options: Array[(String, CLIOption[Any])] = Array( + SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false, + defaultValue = Some(1)), + COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)), + SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false, + defaultValue = Some(1)), + SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true), + SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true), + ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", + required = true), + BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), + DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>", + required = true) + ) + + def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { + val hadoopConfig = new Configuration + hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS)) + val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, + // Rotates on 1KB + new FileSizeRotation(1000)) + val taskConfig = UserConfig.empty + .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) + .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) + + val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT) + val brokerList = config.getString(BROKER_LIST) + val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) + val sourceTopic = config.getString(SOURCE_TOPIC) + val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) + val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK)) + val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig) + val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList) + val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK)) + val partitioner = new HashPartitioner() + val graph = Graph(sourceProcessor ~ partitioner + ~> countProcessor ~ partitioner ~> sinkProcessor) + val app = StreamApplication("MessageCount", graph, UserConfig.empty) + app + } + + def main(akkaConf: Config, args: Array[String]): Unit = { + + val config = parse(args) + val context = ClientContext(akkaConf) + implicit val system = context.system + val appId = context.submit(application(config)) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala new file mode 100644 index 0000000..629deb7 --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state + +import akka.actor.ActorSystem +import org.apache.hadoop.conf.Configuration + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.partitioner.HashPartitioner +import org.apache.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor} +import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory +import org.apache.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig} +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph} + +/** Does exactly-once sliding window based average aggregation */ +object WindowAverageApp extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array( + "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)), + "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)), + "window_size" -> CLIOption("<window size in milliseconds>", required = false, + defaultValue = Some(5000)), + "window_step" -> CLIOption("<window step in milliseconds>", required = false, + defaultValue = Some(5000)) + ) + + def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { + val windowSize = config.getInt("window_size") + val windowStep = config.getInt("window_step") + val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration) + val taskConfig = UserConfig.empty. + withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) + .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) + .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) + val gen = Processor[NumberGeneratorProcessor](config.getInt("gen")) + val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig) + val partitioner = new HashPartitioner() + val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count), + UserConfig.empty) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + + implicit val system = context.system + val appId = context.submit(application(config)) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala new file mode 100644 index 0000000..9650a0a --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.processor + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.monoid.AlgebirdMonoid +import org.apache.gearpump.streaming.serializer.ChillSerializer +import org.apache.gearpump.streaming.state.api.{PersistentState, PersistentTask} +import org.apache.gearpump.streaming.state.impl.NonWindowState +import org.apache.gearpump.streaming.task.TaskContext + +class CountProcessor(taskContext: TaskContext, conf: UserConfig) + extends PersistentTask[Int](taskContext, conf) { + + override def persistentState: PersistentState[Int] = { + import com.twitter.algebird.Monoid.intMonoid + new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int]) + } + + override def processMessage(state: PersistentState[Int], message: Message): Unit = { + state.update(message.timestamp, 1) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala new file mode 100644 index 0000000..0e85f32 --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.processor + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} + +class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + import taskContext.output + + private var num = 0L + override def onStart(startTime: StartTime): Unit = { + num = startTime.startTime + self ! Message("start") + } + + override def onNext(msg: Message): Unit = { + output(Message(num + "", num)) + num += 1 + + import scala.concurrent.duration._ + taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala new file mode 100644 index 0000000..eea2504 --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.processor + +import scala.collection.immutable.TreeMap + +import com.twitter.algebird.{AveragedGroup, AveragedValue} +import org.slf4j.Logger + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.monoid.AlgebirdGroup +import org.apache.gearpump.streaming.serializer.ChillSerializer +import org.apache.gearpump.streaming.state.api.{PersistentState, PersistentTask} +import org.apache.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState} +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil + +object WindowAverageProcessor { + val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor]) +} + +class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig) + extends PersistentTask[AveragedValue](taskContext, conf) { + + override def persistentState: PersistentState[AveragedValue] = { + val group = new AlgebirdGroup(AveragedGroup) + val serializer = new ChillSerializer[TreeMap[Interval, AveragedValue]] + val window = new Window(conf.getValue[WindowConfig](WindowConfig.NAME).get) + new WindowState[AveragedValue](group, serializer, taskContext, window) + } + + override def processMessage(state: PersistentState[AveragedValue], + message: Message): Unit = { + val value = AveragedValue(message.msg.asInstanceOf[String].toLong) + state.update(message.timestamp, value) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala deleted file mode 100644 index 040c343..0000000 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.streaming.examples.state.MessageCountApp._ - -class MessageCountAppSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("MessageCount should succeed to submit application with required arguments") { - val requiredArgs = Array( - s"-$SOURCE_TOPIC", "source", - s"-$SINK_TOPIC", "sink", - s"-$ZOOKEEPER_CONNECT", "localhost:2181", - s"-$BROKER_LIST", "localhost:9092", - s"-$DEFAULT_FS", "hdfs://localhost:9000" - ) - val optionalArgs = Array( - s"-$SOURCE_TASK", "2", - s"-$COUNT_TASK", "2", - s"-$SINK_TASK", "2" - ) - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs.take(0)), - (requiredArgs, optionalArgs.take(2)), - (requiredArgs, optionalArgs.take(4)), - (requiredArgs, optionalArgs) - ) - } - - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - Future { - MessageCountApp.main(masterConfig, args) - } - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala deleted file mode 100644 index 7c1c798..0000000 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state - -import scala.concurrent.Future -import scala.util.Success - -import com.typesafe.config.Config -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} - -class WindowAverageAppSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - override def config: Config = TestUtil.DEFAULT_CONFIG - - property("WindowAverage should succeed to submit application with required arguments") { - val requiredArgs = Array.empty[String] - val optionalArgs = Array( - "-gen", "2", - "-window", "2", - "-window_size", "5000", - "-window_step", "5000" - ) - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs.take(0)), - (requiredArgs, optionalArgs.take(2)), - (requiredArgs, optionalArgs.take(4)), - (requiredArgs, optionalArgs.take(6)), - (requiredArgs, optionalArgs) - ) - } - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - - Future { - WindowAverageApp.main(masterConfig, args) - } - - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala deleted file mode 100644 index a69116b..0000000 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state.processor - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.api.PersistentTask -import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} -import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime} -import io.gearpump.streaming.transaction.api.CheckpointStoreFactory - -class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { - - property("CountProcessor should update state") { - - val taskContext = MockUtil.mockTaskContext - - implicit val system = ActorSystem("test") - - val longGen = Gen.chooseNum[Long](1, 1000) - forAll(longGen) { - (num: Long) => - - val conf = UserConfig.empty - .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) - .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) - .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, - new InMemoryCheckpointStoreFactory) - - val count = new CountProcessor(taskContext, conf) - - val appMaster = TestProbe()(system) - when(taskContext.appMaster).thenReturn(appMaster.ref) - - count.onStart(StartTime(0L)) - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) - - for (i <- 0L to num) { - count.onNext(Message("", i)) - count.state.get shouldBe Some(i + 1) - } - // Next checkpoint time is not arrived yet - when(taskContext.upstreamMinClock).thenReturn(0L) - count.onNext(PersistentTask.CHECKPOINT) - appMaster.expectNoMsg(10.milliseconds) - - // Time to checkpoint - when(taskContext.upstreamMinClock).thenReturn(num) - count.onNext(PersistentTask.CHECKPOINT) - // Only the state before checkpoint time is checkpointed - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) - } - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala deleted file mode 100644 index 18a49ac..0000000 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state.processor - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import org.mockito.Mockito._ -import org.mockito.{Matchers => MockitoMatchers} -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class NumberGeneratorProcessorSpec extends WordSpec with Matchers { - "NumberGeneratorProcessor" should { - "send random numbers" in { - - val taskContext = MockUtil.mockTaskContext - - implicit val system = ActorSystem("test") - - val mockTaskActor = TestProbe() - - // Mock self ActorRef - when(taskContext.self).thenReturn(mockTaskActor.ref) - - val conf = UserConfig.empty - val genNum = new NumberGeneratorProcessor(taskContext, conf) - genNum.onStart(StartTime(0)) - mockTaskActor.expectMsgType[Message] - - genNum.onNext(Message("next")) - verify(taskContext).output(MockitoMatchers.any[Message]) - // mockTaskActor.expectMsgType[Message] - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala deleted file mode 100644 index a9c52aa..0000000 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.examples.state.processor - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import com.twitter.algebird.AveragedValue -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.api.PersistentTask -import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig} -import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime} -import io.gearpump.streaming.transaction.api.CheckpointStoreFactory - -class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers { - property("WindowAverageProcessor should update state") { - - implicit val system = ActorSystem("test") - val longGen = Gen.chooseNum[Long](1, 1000) - forAll(longGen, longGen) { - (data: Long, num: Long) => - val taskContext = MockUtil.mockTaskContext - - val windowSize = num - val windowStep = num - - val conf = UserConfig.empty - .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) - .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) - .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, - new InMemoryCheckpointStoreFactory) - .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) - - val windowAverage = new WindowAverageProcessor(taskContext, conf) - - val appMaster = TestProbe()(system) - when(taskContext.appMaster).thenReturn(appMaster.ref) - - windowAverage.onStart(StartTime(0L)) - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) - - for (i <- 0L until num) { - windowAverage.onNext(Message("" + data, i)) - windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data)) - } - - // Next checkpoint time is not arrived yet - when(taskContext.upstreamMinClock).thenReturn(0L) - windowAverage.onNext(PersistentTask.CHECKPOINT) - appMaster.expectNoMsg(10.milliseconds) - - // Time to checkpoint - when(taskContext.upstreamMinClock).thenReturn(num) - windowAverage.onNext(PersistentTask.CHECKPOINT) - appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) - } - - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala new file mode 100644 index 0000000..729994e --- /dev/null +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state + +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.streaming.examples.state.MessageCountApp._ + +class MessageCountAppSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("MessageCount should succeed to submit application with required arguments") { + val requiredArgs = Array( + s"-$SOURCE_TOPIC", "source", + s"-$SINK_TOPIC", "sink", + s"-$ZOOKEEPER_CONNECT", "localhost:2181", + s"-$BROKER_LIST", "localhost:9092", + s"-$DEFAULT_FS", "hdfs://localhost:9000" + ) + val optionalArgs = Array( + s"-$SOURCE_TASK", "2", + s"-$COUNT_TASK", "2", + s"-$SINK_TASK", "2" + ) + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs.take(0)), + (requiredArgs, optionalArgs.take(2)), + (requiredArgs, optionalArgs.take(4)), + (requiredArgs, optionalArgs) + ) + } + + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + Future { + MessageCountApp.main(masterConfig, args) + } + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala new file mode 100644 index 0000000..00cb290 --- /dev/null +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/WindowAverageAppSpec.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state + +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} + +class WindowAverageAppSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + override def config: Config = TestUtil.DEFAULT_CONFIG + + property("WindowAverage should succeed to submit application with required arguments") { + val requiredArgs = Array.empty[String] + val optionalArgs = Array( + "-gen", "2", + "-window", "2", + "-window_size", "5000", + "-window_step", "5000" + ) + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs.take(0)), + (requiredArgs, optionalArgs.take(2)), + (requiredArgs, optionalArgs.take(4)), + (requiredArgs, optionalArgs.take(6)), + (requiredArgs, optionalArgs) + ) + } + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + + Future { + WindowAverageApp.main(masterConfig, args) + } + + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala new file mode 100644 index 0000000..cdc8cb2 --- /dev/null +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.processor + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.state.api.PersistentTask +import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory + +class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { + + property("CountProcessor should update state") { + + val taskContext = MockUtil.mockTaskContext + + implicit val system = ActorSystem("test") + + val longGen = Gen.chooseNum[Long](1, 1000) + forAll(longGen) { + (num: Long) => + + val conf = UserConfig.empty + .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) + .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, + new InMemoryCheckpointStoreFactory) + + val count = new CountProcessor(taskContext, conf) + + val appMaster = TestProbe()(system) + when(taskContext.appMaster).thenReturn(appMaster.ref) + + count.onStart(StartTime(0L)) + appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) + + for (i <- 0L to num) { + count.onNext(Message("", i)) + count.state.get shouldBe Some(i + 1) + } + // Next checkpoint time is not arrived yet + when(taskContext.upstreamMinClock).thenReturn(0L) + count.onNext(PersistentTask.CHECKPOINT) + appMaster.expectNoMsg(10.milliseconds) + + // Time to checkpoint + when(taskContext.upstreamMinClock).thenReturn(num) + count.onNext(PersistentTask.CHECKPOINT) + // Only the state before checkpoint time is checkpointed + appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) + } + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala new file mode 100644 index 0000000..2268994 --- /dev/null +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.processor + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import org.mockito.Mockito._ +import org.mockito.{Matchers => MockitoMatchers} +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime + +class NumberGeneratorProcessorSpec extends WordSpec with Matchers { + "NumberGeneratorProcessor" should { + "send random numbers" in { + + val taskContext = MockUtil.mockTaskContext + + implicit val system = ActorSystem("test") + + val mockTaskActor = TestProbe() + + // Mock self ActorRef + when(taskContext.self).thenReturn(mockTaskActor.ref) + + val conf = UserConfig.empty + val genNum = new NumberGeneratorProcessor(taskContext, conf) + genNum.onStart(StartTime(0)) + mockTaskActor.expectMsgType[Message] + + genNum.onNext(Message("next")) + verify(taskContext).output(MockitoMatchers.any[Message]) + // mockTaskActor.expectMsgType[Message] + + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } +}
