http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala index 22c36ef..de1054f 100644 --- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala +++ b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,44 +19,47 @@ package io.gearpump.streaming.examples.sol import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration import akka.actor.Cancellable -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} + import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import scala.concurrent.duration.FiniteDuration - -class SOLStreamProcessor(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { import taskContext.output val taskConf = taskContext - private var msgCount : Long = 0 - private var scheduler : Cancellable = null - private var snapShotWordCount : Long = 0 - private var snapShotTime : Long = 0 + private var msgCount: Long = 0 + private var scheduler: Cancellable = null + private var snapShotWordCount: Long = 0 + private var snapShotTime: Long = 0 - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount()) snapShotTime = System.currentTimeMillis() } - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { output(msg) msgCount = msgCount + 1 } - override def onStop() : Unit = { + override def onStop(): Unit = { if (scheduler != null) { scheduler.cancel() } } - def reportWordCount() : Unit = { - val current : Long = System.currentTimeMillis() - LOG.info(s"Task ${taskConf.taskId} Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)") + def reportWordCount(): Unit = { + val current: Long = System.currentTimeMillis() + LOG.info(s"Task ${taskConf.taskId} " + + s"Throughput: ${(msgCount - snapShotWordCount, (current - snapShotTime) / 1000)} " + + s"(words, second)") snapShotWordCount = msgCount snapShotTime = current }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala index 7f285f2..5c0f3be 100644 --- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala +++ b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOLStreamProducer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,22 +20,23 @@ package io.gearpump.streaming.examples.sol import java.util.Random -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.streaming.examples.sol.SOLStreamProducer._ +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class SOLStreamProducer(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { import taskContext.output private val sizeInBytes = conf.getInt(SOLStreamProducer.BYTES_PER_MESSAGE) - .getOrElse(DEFAULT_MESSAGE_SIZE) - private var messages : Array[String] = null - private var rand : Random = null - private var messageCount : Long = 0 + .getOrElse(DEFAULT_MESSAGE_SIZE) + private var messages: Array[String] = null + private var rand: Random = null + private var messageCount: Long = 0 - override def onStart(startTime : StartTime) : Unit = { + override def onStart(startTime: StartTime): Unit = { prepareRandomMessage self ! Start } @@ -47,16 +48,16 @@ class SOLStreamProducer(taskContext : TaskContext, conf: UserConfig) extends Tas 0.until(differentMessages).map { index => val sb = new StringBuilder(sizeInBytes) - //Even though java encodes strings in UCS2, the serialized version sent by the tuples + // Even though java encodes strings in UCS2, the serialized version sent by the tuples // is UTF8, so it should be a single byte - 0.until(sizeInBytes).foldLeft(sb){(sb, j) => - sb.append(rand.nextInt(9)); + 0.until(sizeInBytes).foldLeft(sb) { (sb, j) => + sb.append(rand.nextInt(9)) } - messages(index) = sb.toString(); + messages(index) = sb.toString() } } - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message): Unit = { val message = messages(rand.nextInt(messages.length)) output(new Message(message, System.currentTimeMillis())) messageCount = messageCount + 1L @@ -64,13 +65,14 @@ class SOLStreamProducer(taskContext : TaskContext, conf: UserConfig) extends Tas } // messageSourceMinClock represent the min clock of the message source - private def messageSourceMinClock : Message = { + private def messageSourceMinClock: Message = { Message("tick", System.currentTimeMillis()) } } object SOLStreamProducer { - val DEFAULT_MESSAGE_SIZE = 100 // bytes + val DEFAULT_MESSAGE_SIZE = 100 + // Bytes val BYTES_PER_MESSAGE = "bytesPerMessage" val Start = Message("start") } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala index 1f3dbbd..6e266d0 100644 --- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala +++ b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,19 @@ package io.gearpump.streaming.examples.sol -import io.gearpump.cluster.ClientToMaster.SubmitApplication -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.{MasterHarness, TestUtil} +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config import org.scalatest.prop.PropertyChecks import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec} -import scala.concurrent.Future -import scala.util.Success +import io.gearpump.cluster.ClientToMaster.SubmitApplication +import io.gearpump.cluster.MasterToClient.SubmitApplicationResult +import io.gearpump.cluster.{MasterHarness, TestUtil} -class SOLSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { +class SOLSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness { override def beforeAll { startActorSystem() } @@ -36,7 +39,7 @@ class SOLSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + override def config: Config = TestUtil.DEFAULT_CONFIG property("SOL should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] @@ -56,11 +59,12 @@ class SOLSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs - Future {SOL.main(masterConfig, args)} + Future { + SOL.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala index acc7b8f..f5035b4 100644 --- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala +++ b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,15 +17,14 @@ */ package io.gearpump.streaming.examples.sol -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.{FlatSpec, Matchers} -import scala.language.postfixOps +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime class SOLStreamProcessorSpec extends FlatSpec with Matchers { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala index 4ff7d12..4bac30c 100644 --- a/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala +++ b/examples/streaming/sol/src/test/scala/io/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,15 @@ */ package io.gearpump.streaming.examples.sol -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime + class SOLStreamProducerSpec extends WordSpec with Matchers { "SOLStreamProducer" should { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala index e9ceb8b..3c6cde6 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,22 +19,24 @@ package io.gearpump.streaming.examples.state import akka.actor.ActorSystem -import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.sink.DataSinkProcessor -import io.gearpump.streaming.source.DataSourceProcessor -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.examples.state.processor.CountProcessor -import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory -import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation -import io.gearpump.streaming.state.impl.PersistentStateConfig +import org.apache.hadoop.conf.Configuration + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.examples.state.processor.CountProcessor +import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory +import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation +import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import io.gearpump.streaming.sink.DataSinkProcessor +import io.gearpump.streaming.source.DataSourceProcessor +import io.gearpump.streaming.state.impl.PersistentStateConfig +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph.Node import io.gearpump.util.{AkkaApp, Graph} -import org.apache.hadoop.conf.Configuration +/** Does exactly-once message count */ object MessageCountApp extends AkkaApp with ArgumentsParser { val SOURCE_TASK = "sourceTask" val COUNT_TASK = "countTask" @@ -46,21 +48,25 @@ object MessageCountApp extends AkkaApp with ArgumentsParser { val DEFAULT_FS = "defaultFS" override val options: Array[(String, CLIOption[Any])] = Array( - SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false, defaultValue = Some(1)), + SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false, + defaultValue = Some(1)), COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)), - SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false, defaultValue = Some(1)), + SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false, + defaultValue = Some(1)), SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true), SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true), - ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", required = true), + ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>", + required = true), BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true), - DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>", required = true) + DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>", + required = true) ) - def application(config: ParseResult)(implicit system: ActorSystem) : StreamApplication = { + def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { val hadoopConfig = new Configuration hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS)) val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, - // rotate on 1KB + // Rotates on 1KB new FileSizeRotation(1000)) val taskConfig = UserConfig.empty .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) @@ -77,7 +83,8 @@ object MessageCountApp extends AkkaApp with ArgumentsParser { val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList) val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK)) val partitioner = new HashPartitioner() - val graph = Graph(sourceProcessor ~ partitioner ~> countProcessor ~ partitioner ~> sinkProcessor) + val graph = Graph(sourceProcessor ~ partitioner + ~> countProcessor ~ partitioner ~> sinkProcessor) val app = StreamApplication("MessageCount", graph, UserConfig.empty) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala index 7489808..6f3bc79 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/WindowAverageApp.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,40 +19,45 @@ package io.gearpump.streaming.examples.state import akka.actor.ActorSystem -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.examples.state.processor.{WindowAverageProcessor, NumberGeneratorProcessor} -import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory -import io.gearpump.streaming.state.impl.{WindowConfig, PersistentStateConfig} +import org.apache.hadoop.conf.Configuration + import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.partitioner.HashPartitioner +import io.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor} +import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory +import io.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig} +import io.gearpump.streaming.{Processor, StreamApplication} import io.gearpump.util.Graph.Node import io.gearpump.util.{AkkaApp, Graph} -import org.apache.hadoop.conf.Configuration +/** Does exactly-once sliding window based average aggregation */ object WindowAverageApp extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue = Some(1)), "window" -> CLIOption("<how mange window tasks", required = false, defaultValue = Some(1)), - "window_size" -> CLIOption("<window size in milliseconds>", required = false , defaultValue = Some(5000)), - "window_step" -> CLIOption("<window step in milliseconds>", required = false , defaultValue = Some(5000)) + "window_size" -> CLIOption("<window size in milliseconds>", required = false, + defaultValue = Some(5000)), + "window_step" -> CLIOption("<window step in milliseconds>", required = false, + defaultValue = Some(5000)) ) - def application(config: ParseResult)(implicit system: ActorSystem) : StreamApplication = { + def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { val windowSize = config.getInt("window_size") val windowStep = config.getInt("window_step") val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", new Configuration) - val taskConfig = UserConfig.empty - .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) - .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) - .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) - .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) + val taskConfig = UserConfig.empty. + withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) + .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) + .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) val gen = Processor[NumberGeneratorProcessor](config.getInt("gen")) val count = Processor[WindowAverageProcessor](config.getInt("window"), taskConf = taskConfig) val partitioner = new HashPartitioner() - val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count), UserConfig.empty) + val app = StreamApplication("WindowAverage", Graph(gen ~ partitioner ~> count), + UserConfig.empty) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala index 04cea69..6610b91 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,13 +18,13 @@ package io.gearpump.streaming.examples.state.processor +import io.gearpump.Message +import io.gearpump.cluster.UserConfig import io.gearpump.streaming.monoid.AlgebirdMonoid import io.gearpump.streaming.serializer.ChillSerializer -import io.gearpump.streaming.state.api.{PersistentTask, PersistentState} +import io.gearpump.streaming.state.api.{PersistentState, PersistentTask} import io.gearpump.streaming.state.impl.NonWindowState import io.gearpump.streaming.task.TaskContext -import io.gearpump.cluster.UserConfig -import io.gearpump.Message class CountProcessor(taskContext: TaskContext, conf: UserConfig) extends PersistentTask[Int](taskContext, conf) { @@ -39,4 +39,3 @@ class CountProcessor(taskContext: TaskContext, conf: UserConfig) } } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala index 0dae825..fa9854f 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.streaming.examples.state.processor -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} import io.gearpump.Message import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -class NumberGeneratorProcessor(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { import taskContext.output private var num = 0L http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala index 82d6243..e7ac9b3 100644 --- a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala +++ b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,24 +18,25 @@ package io.gearpump.streaming.examples.state.processor +import scala.collection.immutable.TreeMap + import com.twitter.algebird.{AveragedGroup, AveragedValue} +import org.slf4j.Logger + +import io.gearpump.Message +import io.gearpump.cluster.UserConfig import io.gearpump.streaming.monoid.AlgebirdGroup import io.gearpump.streaming.serializer.ChillSerializer -import io.gearpump.streaming.state.api.{PersistentTask, PersistentState} -import io.gearpump.streaming.state.impl.{WindowConfig, WindowState, Interval, Window} +import io.gearpump.streaming.state.api.{PersistentState, PersistentTask} +import io.gearpump.streaming.state.impl.{Interval, Window, WindowConfig, WindowState} import io.gearpump.streaming.task.TaskContext -import io.gearpump.Message -import io.gearpump.cluster.UserConfig import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.immutable.TreeMap object WindowAverageProcessor { val LOG: Logger = LogUtil.getLogger(classOf[WindowAverageProcessor]) } -class WindowAverageProcessor(taskContext : TaskContext, conf: UserConfig) +class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig) extends PersistentTask[AveragedValue](taskContext, conf) { override def persistentState: PersistentState[AveragedValue] = { @@ -46,7 +47,7 @@ class WindowAverageProcessor(taskContext : TaskContext, conf: UserConfig) } override def processMessage(state: PersistentState[AveragedValue], - message: Message): Unit = { + message: Message): Unit = { val value = AveragedValue(message.msg.asInstanceOf[String].toLong) state.update(message.timestamp, value) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala index 8053f57..040c343 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,19 @@ package io.gearpump.streaming.examples.state -import io.gearpump.streaming.examples.state.MessageCountApp._ +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + import io.gearpump.cluster.ClientToMaster.SubmitApplication import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.{MasterHarness, TestUtil} -import org.scalatest.{Matchers, BeforeAndAfter, PropSpec} -import org.scalatest.prop.PropertyChecks - - -import scala.concurrent.Future -import scala.util.Success +import io.gearpump.streaming.examples.state.MessageCountApp._ -class MessageCountAppSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { +class MessageCountAppSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { before { startActorSystem() @@ -39,7 +40,7 @@ class MessageCountAppSpec extends PropSpec with PropertyChecks with Matchers wit shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + protected override def config = TestUtil.DEFAULT_CONFIG property("MessageCount should succeed to submit application with required arguments") { val requiredArgs = Array( @@ -68,7 +69,9 @@ class MessageCountAppSpec extends PropSpec with PropertyChecks with Matchers wit val masterReceiver = createMockMaster() forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs - Future {MessageCountApp.main(masterConfig, args)} + Future { + MessageCountApp.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala index 998dd1d..7c1c798 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/WindowAverageAppSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,20 @@ package io.gearpump.streaming.examples.state +import scala.concurrent.Future +import scala.util.Success + +import com.typesafe.config.Config +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + import io.gearpump.cluster.ClientToMaster.SubmitApplication import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.{MasterHarness, TestUtil} -import org.scalatest.{Matchers, BeforeAndAfter, PropSpec} -import org.scalatest.prop.PropertyChecks -import scala.concurrent.Future -import scala.util.Success +class WindowAverageAppSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { -class WindowAverageAppSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { before { startActorSystem() } @@ -36,7 +40,7 @@ class WindowAverageAppSpec extends PropSpec with PropertyChecks with Matchers wi shutdownActorSystem() } - override def config = TestUtil.DEFAULT_CONFIG + override def config: Config = TestUtil.DEFAULT_CONFIG property("WindowAverage should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] @@ -61,7 +65,9 @@ class WindowAverageAppSpec extends PropSpec with PropertyChecks with Matchers wi forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => val args = requiredArgs ++ optionalArgs - Future {WindowAverageApp.main(masterConfig, args)} + Future { + WindowAverageApp.main(masterConfig, args) + } masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) masterReceiver.reply(SubmitApplicationResult(Success(0))) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala index cbc9cce..a69116b 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,23 +18,23 @@ package io.gearpump.streaming.examples.state.processor +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.testkit.TestProbe -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.api.PersistentTask -import io.gearpump.streaming.state.impl.PersistentStateConfig -import io.gearpump.streaming.task.ReportCheckpointClock -import io.gearpump.streaming.transaction.api.CheckpointStoreFactory -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.state.impl.InMemoryCheckpointStoreFactory -import io.gearpump.streaming.task.StartTime import org.mockito.Mockito._ import org.scalacheck.Gen -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} -import scala.concurrent.duration._ +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.state.api.PersistentTask +import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} +import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime} +import io.gearpump.streaming.transaction.api.CheckpointStoreFactory class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { @@ -51,7 +51,8 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { val conf = UserConfig.empty .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) - .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, new InMemoryCheckpointStoreFactory) + .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, + new InMemoryCheckpointStoreFactory) val count = new CountProcessor(taskContext, conf) @@ -61,25 +62,23 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { count.onStart(StartTime(0L)) appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L)) - for (i <- 0L to num) { count.onNext(Message("", i)) count.state.get shouldBe Some(i + 1) } - // next checkpoint time is at num - // not yet + // Next checkpoint time is not arrived yet when(taskContext.upstreamMinClock).thenReturn(0L) count.onNext(PersistentTask.CHECKPOINT) - appMaster.expectNoMsg(10 milliseconds) + appMaster.expectNoMsg(10.milliseconds) - // time to checkpoint + // Time to checkpoint when(taskContext.upstreamMinClock).thenReturn(num) count.onNext(PersistentTask.CHECKPOINT) - // only the state before checkpoint time is checkpointed + // Only the state before checkpoint time is checkpointed appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) } - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala index 8105c32..18a49ac 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,20 @@ package io.gearpump.streaming.examples.state.processor +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.ActorSystem import akka.testkit.TestProbe -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import org.mockito.{Matchers => MockitoMatchers} import org.mockito.Mockito._ +import org.mockito.{Matchers => MockitoMatchers} import org.scalatest.{Matchers, WordSpec} +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.StartTime + class NumberGeneratorProcessorSpec extends WordSpec with Matchers { "NumberGeneratorProcessor" should { "send random numbers" in { @@ -38,7 +42,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers { val mockTaskActor = TestProbe() - //mock self ActorRef + // Mock self ActorRef when(taskContext.self).thenReturn(mockTaskActor.ref) val conf = UserConfig.empty @@ -46,13 +50,12 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers { genNum.onStart(StartTime(0)) mockTaskActor.expectMsgType[Message] - genNum.onNext(Message("next")) verify(taskContext).output(MockitoMatchers.any[Message]) - //mockTaskActor.expectMsgType[Message] + // mockTaskActor.expectMsgType[Message] - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala index 03bc8fb..a9c52aa 100644 --- a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,25 +18,24 @@ package io.gearpump.streaming.examples.state.processor +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.testkit.TestProbe import com.twitter.algebird.AveragedValue -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.state.api.PersistentTask -import io.gearpump.streaming.state.impl.{WindowConfig, PersistentStateConfig} -import io.gearpump.streaming.task.ReportCheckpointClock -import io.gearpump.streaming.transaction.api.CheckpointStoreFactory -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.streaming.state.impl.InMemoryCheckpointStoreFactory -import io.gearpump.streaming.task.StartTime import org.mockito.Mockito._ import org.scalacheck.Gen -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} -import scala.concurrent.duration._ - +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.state.api.PersistentTask +import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig} +import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime} +import io.gearpump.streaming.transaction.api.CheckpointStoreFactory class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers { property("WindowAverageProcessor should update state") { @@ -51,10 +50,11 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match val windowStep = num val conf = UserConfig.empty - .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) - .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) - .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, new InMemoryCheckpointStoreFactory) - .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) + .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num) + .withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, + new InMemoryCheckpointStoreFactory) + .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep)) val windowAverage = new WindowAverageProcessor(taskContext, conf) @@ -69,19 +69,18 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data)) } - // next checkpoint time is at num - // not yet + // Next checkpoint time is not arrived yet when(taskContext.upstreamMinClock).thenReturn(0L) windowAverage.onNext(PersistentTask.CHECKPOINT) - appMaster.expectNoMsg(10 milliseconds) + appMaster.expectNoMsg(10.milliseconds) - // time to checkpoint + // Time to checkpoint when(taskContext.upstreamMinClock).thenReturn(num) windowAverage.onNext(PersistentTask.CHECKPOINT) appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num)) } - system.shutdown() - system.awaitTermination() + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css index 4693dc6..182d722 100644 --- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css +++ b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css @@ -1,16 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + .ui-datepicker { font-size: 11px; - } +} + .sidebar-label { - font-size:15px; + font-size: 15px; font-family: calibri, Arial, Helvetica, sans-serif; } - + .help { - font-size:12px; + font-size: 12px; font-family: calibri, Arial, Helvetica, sans-serif; } - + div.splitter { margin: 12px 0px 7px 0px; clear: both; @@ -18,13 +37,13 @@ div.splitter { } input.sidebar { - width:165px + width: 165px } select.sidebar { - width:198px + width: 198px } - + table.dataintable { font-family: calibri, Arial, Helvetica, sans-serif; font-size: 15px; @@ -48,49 +67,49 @@ table.dataintable td { border: 1px solid #AAA; } -#search{ - width:100px; - height:25px; - position:relative; - left:0px; - top:5px; +#search { + width: 100px; + height: 25px; + position: relative; + left: 0px; + top: 5px; } -#mytable{ - width:100%; - height:300; - float:left; +#mytable { + width: 100%; + height: 300; + float: left; } -#mychart{ - height:250px; - width:100%; +#mychart { + height: 250px; + width: 100%; } -#Menu{ - height:100%; - width:245px; - float:left; +#Menu { + height: 100%; + width: 245px; + float: left; } -#header{ - height:115px; +#header { + height: 115px; background-image: url(header.png); } -#body{ - height:100%; - width:100%; +#body { + height: 100%; + width: 100%; background-image: url(body.png); - background-size:100% 100%; + background-size: 100% 100%; } -#footer{ - color:white; - height:70px; - line-height:70px; - text-align:middle; - clear:both; - text-align:center; +#footer { + color: white; + height: 70px; + line-height: 70px; + text-align: middle; + clear: both; + text-align: center; background-image: url(foot.png); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js index 4e5e5f6..97f1e07 100644 --- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js +++ b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js @@ -1,10 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + function initChart(chartid, tableid, stockId) { require.config({ paths: { - echarts: 'http://echarts.baidu.com/build/dist' + echarts: 'http://echarts.baidu.com/build/dist' } }); - + require( [ 'echarts', @@ -17,13 +35,13 @@ function initChart(chartid, tableid, stockId) { var dataPoints = 100; var timeTicket; clearInterval(timeTicket); - timeTicket = setInterval(function (){ - $.getJSON( "report/" + stockId, function( json ) { + timeTicket = setInterval(function () { + $.getJSON("report/" + stockId, function (json) { STOCK_NAME = json.name - + var maxDrawnDown = json.currentMax[0].max.price - json.currentMax[0].min.price; - var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/,''); - // å¨ææ°æ®æ¥å£ addData + var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, ''); + // å¨ææ°æ®æ¥å£ addData myChart.addData([ [ 0, // ç³»åç´¢å¼ @@ -40,91 +58,91 @@ function initChart(chartid, tableid, stockId) { time ] ]); - document.getElementById(chartid).style.display="block" + document.getElementById(chartid).style.display = "block" document.getElementById(tableid).innerHTML = "<pre>" + JSON.stringify(json, null, 2) + "</pre>" - }); + }); }, 2000); var subtext_ = "Draw Down" var option = { - title : { - text: 'Stock Analysis', - subtext: "Max " + subtext_ + title: { + text: 'Stock Analysis', + subtext: "Max " + subtext_ }, - tooltip : { - trigger: 'axis' + tooltip: { + trigger: 'axis' }, legend: { - data:["Current Price", "Current Draw Down"] + data: ["Current Price", "Current Draw Down"] }, toolbox: { - show : false, - feature : { - mark : {show: true}, - dataView : {show: true, readOnly: false}, - magicType : {show: true, type: ['line', 'bar']}, - restore : {show: true}, - saveAsImage : {show: true} + show: false, + feature: { + mark: {show: true}, + dataView: {show: true, readOnly: false}, + magicType: {show: true, type: ['line', 'bar']}, + restore: {show: true}, + saveAsImage: {show: true} } }, - dataZoom : { - show : false, - start : 0, - end : 100 + dataZoom: { + show: false, + start: 0, + end: 100 }, - xAxis : [ + xAxis: [ { - type : 'category', - boundaryGap : true, - data : (function (){ + type: 'category', + boundaryGap: true, + data: (function () { var now = new Date(); var res = []; var len = dataPoints; while (len--) { - res.unshift(now.toLocaleTimeString().replace(/^\D*/,'')); + res.unshift(now.toLocaleTimeString().replace(/^\D*/, '')); now = new Date(now - 2000); } return res; })() } ], - yAxis : [ + yAxis: [ { - type : 'value', + type: 'value', scale: true, - name : subtext_ + ' ä»·æ ¼/å ', + name: subtext_ + ' ä»·æ ¼/å ', boundaryGap: [0, 0.3] }, { - type : 'value', + type: 'value', scale: true, - name : 'Current ä»·æ ¼/å ', + name: 'Current ä»·æ ¼/å ', boundaryGap: [0, 0.1] } ], - series : [ + series: [ { - name:"Current Draw Down", - type:'line', - data:(function (){ + name: "Current Draw Down", + type: 'line', + data: (function () { var res = []; var len = dataPoints; while (len--) { - res.push(0); + res.push(0); } return res; })() }, { - name:"Current Price", - type:'line', + name: "Current Price", + type: 'line', yAxisIndex: 1, - data:(function (){ + data: (function () { var res = []; var len = dataPoints; while (len--) { - res.push(0); + res.push(0); } return res; })() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/resources/stock/stock.html ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html index 224473f..9682a53 100644 --- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html +++ b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html @@ -1,66 +1,87 @@ <!DOCTYPE html> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + <html> <head> - <meta charset="utf-8"> - <link rel=stylesheet type=text/css href="css/custom.css"> - <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> - <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script> - <script src="js/stock.js"></script> - <script type="text/javascript"> - function search_onclick(){ - var stockId = document.getElementById('stockId').value - initChart("mychart", "mytable", stockId) - } - </script> + <meta charset="utf-8"> + <link rel=stylesheet type=text/css href="css/custom.css"> + <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> + <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script> + <script src="js/stock.js"></script> + <script type="text/javascript"> + function search_onclick() { + var stockId = document.getElementById('stockId').value + initChart("mychart", "mytable", stockId) + } + </script> </head> <body style="background-color:#F2F2F2"> <div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;"> - <div style="height:0px"></div> - <div id="header"> - <div style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white"> - Big Data Stock Analysis Demo - </div> + <div style="height:0px"></div> + <div id="header"> + <div + style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white"> + Big Data Stock Analysis Demo </div> - <div id="body"> - <div id="Menu"> - <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;"> - <!-- form to post to accompany to get accompanying cars --> + </div> + <div id="body"> + <div id="Menu"> + <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;"> + <!-- form to post to accompany to get accompanying cars --> - <table style="width:100%"> - <tr> - <td class="sidebar-label">Stock Id:</td> - </tr> - <tr> - <td class="sidebar-label">Example: sh600019, sz000002</td> - </tr> - <tr> - <td style="vertical-align:top;"> - <input id="stockId" class="sidebar" type="text" name="stockId"/> - </td> - </tr> - </table> - <div class="splitter"></div> - <div> - <button id="search" onclick="search_onclick()">Search</button> - </div> - </div> + <table style="width:100%"> + <tr> + <td class="sidebar-label">Stock Id:</td> + </tr> + <tr> + <td class="sidebar-label">Example: sh600019, sz000002</td> + </tr> + <tr> + <td style="vertical-align:top;"> + <input id="stockId" class="sidebar" type="text" name="stockId"/> + </td> + </tr> + </table> + <div class="splitter"></div> + <div> + <button id="search" onclick="search_onclick()">Search</button> </div> - <div id="content" style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;"> - <div style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black"> - Analysis Result: - </div> - <div style="height:7px;background-color:#92BDF2;"></div> + </div> + </div> + <div id="content" + style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;"> + <div + style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black"> + Analysis Result: + </div> + <div style="height:7px;background-color:#92BDF2;"></div> - <div id="mychart"></div> + <div id="mychart"></div> - <div id="mytable"></div> - </div> + <div id="mytable"></div> </div> - <div id="footer"> - Big Data Team @ Intel - </div> + </div> + <div id="footer"> + Big Data Team @ Intel + </div> </div> </body> </html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala index a37ab7f..1aea563 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Analyzer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,26 +16,26 @@ * limitations under the License. */ - package io.gearpump.streaming.examples.stock +import scala.collection.immutable + import akka.actor.Actor.Receive -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import Analyzer.HistoricalStates -import Price._ -import io.gearpump.util.LogUtil import org.joda.time.DateTime import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} -import scala.collection.immutable +import io.gearpump.Message +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.examples.stock.Analyzer.HistoricalStates +import io.gearpump.streaming.examples.stock.Price._ +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} +import io.gearpump.util.LogUtil /** * Dradown analyzer * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics) */ -class Analyzer (taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { +class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" @@ -62,7 +62,7 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) extends Task(taskCon } override def receiveUnManagedMessage: Receive = { - case get@ GetReport(stockId, date) => + case get@GetReport(stockId, date) => var currentMax = currentDownwardsStates.get(stockId) val dateTime = Option(date) match { @@ -77,7 +77,7 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) extends Task(taskCon val name = stockInfos.get(stockId).map(_.name).getOrElse("") sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax) } - + private def updateCurrentStates(stock: StockPrice) = { var downwardsState: StockPriceState = null if (currentDownwardsStates.contains(stock.stockId)) { @@ -88,32 +88,33 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) extends Task(taskCon currentDownwardsStates += stock.stockId -> downwardsState downwardsState } - - //Update the stock's latest state. + + // Update the stock's latest state. private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = { - if(currentPrice.price > oldState.max.price) { + if (currentPrice.price > oldState.max.price) { StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice) } else { - val newState = StockPriceState(oldState.stockID, oldState.max, Price.min(currentPrice, oldState.min), currentPrice) + val newState = StockPriceState(oldState.stockID, oldState.max, + Price.min(currentPrice, oldState.min), currentPrice) newState } } - + private def checkDate(stock: StockPrice) = { if (currentDownwardsStates.contains(stock.stockId)) { val now = new DateTime(stock.timestamp) val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp) - //New day - if(now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) { + // New day + if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) { currentDownwardsStates -= stock.stockId } } } - - private def parseDate(format: DateTimeFormatter, input: String): DateTime = { + + private def parseDate(format: DateTimeFormatter, input: String): DateTime = { format.parseDateTime(input) } - + private def handleHistoricalQuery(stockId: String, date: DateTime) = { val maximal = historicalStates.getHistoricalMaximal(stockId, date) maximal @@ -123,28 +124,28 @@ class Analyzer (taskContext: TaskContext, conf: UserConfig) extends Task(taskCon object Analyzer { class HistoricalStates { - val LOG =LogUtil.getLogger(getClass) + val LOG = LogUtil.getLogger(getClass) val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState] private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState] - + def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = { val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp) var newMaximalState: Option[StockPriceState] = null if (newState.max.price < Float.MinPositiveValue) { newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise) - if(newMaximalState.nonEmpty) { - historicalMaxRaise += (newState.stockID, date)-> newMaximalState.get + if (newMaximalState.nonEmpty) { + historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get } } else { newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown) - if(newMaximalState.nonEmpty) { + if (newMaximalState.nonEmpty) { historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get } } newMaximalState } - + def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = { historicalMaxDrawdown.get((stockId, date)) } @@ -152,17 +153,17 @@ object Analyzer { private def generateNewMaximal( state: StockPriceState, date: DateTime, - map: immutable.HashMap[(String, DateTime), StockPriceState]): - Option[StockPriceState] = { + map: immutable.HashMap[(String, DateTime), StockPriceState]) + : Option[StockPriceState] = { val maximal = map.get((state.stockID, date)) - if(maximal.nonEmpty && maximal.get.drawDown > state.drawDown) { + if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) { None } else { Some(state) } } } - + def getDateFromTimeStamp(timestamp: Long): DateTime = { new DateTime(timestamp).withTimeAtStartOfDay() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala index 189ddba..da5ab63 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Crawler.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,14 +16,13 @@ * limitations under the License. */ - package io.gearpump.streaming.examples.stock -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} +import scala.concurrent.duration._ + import io.gearpump.Message import io.gearpump.cluster.UserConfig - -import scala.concurrent.duration._ +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -48,12 +47,12 @@ class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskConte val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get - override def onStart(startTime : StartTime) : Unit = { - //nothing + override def onStart(startTime: StartTime): Unit = { + // Nothing } - override def onNext(msg : Message) : Unit = { - stockMarket.getPrice(stocks).foreach {price => + override def onNext(msg: Message): Unit = { + stockMarket.getPrice(stocks).foreach { price => output(new Message(price, price.timestamp)) } scheduleOnce(5.seconds)(self ! FetchStockPrice) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala index cb90a79..5525524 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/Data.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,26 +16,28 @@ * limitations under the License. */ - package io.gearpump.streaming.examples.stock -import akka.actor.ActorRef - -case class StockPrice(stockId: String, name: String, price: String, delta: String, pecent: String, volume: String, money: String, timestamp: Long) { +// scalastyle:off equals.hash.code case class has equals defined +case class StockPrice( + stockId: String, name: String, price: String, delta: String, pecent: String, volume: String, + money: String, timestamp: Long) { override def hashCode: Int = stockId.hashCode } - +// scalastyle:on equals.hash.code case class has equals defined case class Price(price: Float, timestamp: Long) object Price { + import scala.language.implicitConversions + implicit def StockPriceToPrice(stock: StockPrice): Price = { Price(stock.price.toFloat, stock.timestamp) } - def min(first: Price, second: Price) = { - if(first.price < second.price) { + def min(first: Price, second: Price): Price = { + if (first.price < second.price) { first } else { second @@ -45,13 +47,15 @@ object Price { case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) { - def drawDownPeriod = min.timestamp - max.timestamp + def drawDownPeriod: Long = min.timestamp - max.timestamp - def recoveryPeriod = current.timestamp - min.timestamp + def recoveryPeriod: Long = current.timestamp - min.timestamp - def drawDown = max.price - min.price + def drawDown: Float = max.price - min.price } case class GetReport(stockId: String, date: String) -case class Report(stockId: String, name: String, date: String, historyMax: Option[StockPriceState], currentMax: Option[StockPriceState]) +case class Report( + stockId: String, name: String, date: String, historyMax: Option[StockPriceState], + currentMax: Option[StockPriceState]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala index aa8541a..33db128 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/QueryServer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,37 +16,36 @@ * limitations under the License. */ - package io.gearpump.streaming.examples.stock import java.util.concurrent.TimeUnit +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} import akka.actor.Actor._ -import akka.actor.{Actor, Props} +import akka.actor.{Actor, ActorRefFactory, Props} import akka.io.IO import akka.pattern.ask +import spray.can.Http +import spray.http.StatusCodes +import spray.json._ +import spray.routing.{HttpService, Route} +import upickle.default.write + import io.gearpump.Message import io.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest import io.gearpump.cluster.UserConfig import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import io.gearpump.streaming.appmaster.{AppMaster, ProcessorSummary, StreamAppMasterSummary} +import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} import io.gearpump.streaming.examples.stock.QueryServer.WebServer import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} -import spray.can.Http -import spray.http.StatusCodes -import spray.json._ -import spray.routing.HttpService -import upickle.default.write -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} +class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + import scala.concurrent.ExecutionContext.Implicits.global -class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf){ import taskContext.{appId, appMaster} - import ExecutionContext.Implicits.global - var analyzer: (ProcessorId, ProcessorSummary) = null implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) @@ -56,7 +55,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC } override def onNext(msg: Message): Unit = { - //Skip + // Skip } override def receiveUnManagedMessage: Receive = messageHandler @@ -67,14 +66,14 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC val (processorId, processor) = kv processor.taskClass == classOf[Analyzer].getName }.get - case getReport @ GetReport(stockId, date) => + case getReport@GetReport(stockId, date) => val parallism = analyzer._2.parallelism val processorId = analyzer._1 val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism) val requester = sender import scala.concurrent.Future (appMaster ? LookupTaskActorRef(analyzerTaskId)) - .asInstanceOf[Future[TaskActorRef]].flatMap {task => + .asInstanceOf[Future[TaskActorRef]].flatMap { task => (task.task ? getReport).asInstanceOf[Future[Report]] }.map { report => @@ -82,7 +81,7 @@ class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskC requester ! report } case _ => - //ignore + // Ignore } } @@ -91,21 +90,22 @@ object QueryServer { import context.dispatcher implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - def actorRefFactory = context + def actorRefFactory: ActorRefFactory = context implicit val system = context.system IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080) override def receive: Receive = runRoute(webServer ~ staticRoute) - def webServer = { - path("report" / PathElement) { stockId => + def webServer: Route = { + path("report" / Segment) { stockId => get { onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) { case Success(report: Report) => val json = write(report) complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}") + case Failure(ex) => complete(StatusCodes.InternalServerError, + s"An error occurred: ${ex.getMessage}") } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala index 5686ec7..508c282 100644 --- a/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala +++ b/examples/streaming/stockcrawler/src/main/scala/io/gearpump/streaming/examples/stock/StockMarket.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,20 @@ package io.gearpump.streaming.examples.stock import java.nio.charset.Charset +import scala.io.Codec + import org.apache.commons.httpclient.methods.GetMethod import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager} -import StockMarket.ServiceHour -import io.gearpump.transport.HostPort -import io.gearpump.util.LogUtil import org.htmlcleaner.{HtmlCleaner, TagNode} import org.joda.time.{DateTime, DateTimeZone} -import scala.io.Codec +import io.gearpump.streaming.examples.stock.StockMarket.ServiceHour +import io.gearpump.transport.HostPort +import io.gearpump.util.LogUtil class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable { - def LOG = LogUtil.getLogger(getClass) + private def LOG = LogUtil.getLogger(getClass) @transient private var connectionManager: MultiThreadedHttpConnectionManager = null @@ -41,7 +42,7 @@ class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializ private val stockPriceParser = """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r - def shutdown: Unit = { + def shutdown(): Unit = { Option(connectionManager).map(_.shutdown()) } @@ -58,8 +59,7 @@ class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializ _client } - def getPrice(stocks: Array[String]) : Array[StockPrice] = { - + def getPrice(stocks: Array[String]): Array[StockPrice] = { LOG.info(s"getPrice 1") @@ -72,7 +72,8 @@ class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializ client.executeMethod(get) val current = System.currentTimeMillis() - val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)(new Codec(Charset forName "GBK")).getLines().flatMap { line => + val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)( + new Codec(Charset forName "GBK")).getLines().flatMap { line => line match { case stockPriceParser(stockId, name, price, delta, pecent, volume, money) => Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current)) @@ -100,18 +101,18 @@ class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializ val root = cleaner.clean(get.getResponseBodyAsStream) - val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]"); + val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]") val elements = root.getElementsByName("a", true) val hrefs = (0 until stockUrls.length) .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href")) - .map {url => - url match { - case urlPattern(code) => code - case _ => null - } - }.toArray + .map { url => + url match { + case urlPattern(code) => code + case _ => null + } + }.toArray hrefs } } @@ -123,14 +124,14 @@ object StockMarket { /** * Morning openning: 9:30 am - 11:30 am */ - val morningStart = GMT8(new DateTime(0,1,1,9,30)).getMillis - val morningEnd = GMT8(new DateTime(0,1,1,11,30)).getMillis + val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis + val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis /** * After noon openning: 13:00 pm - 15:00 pm */ - val afternoonStart = GMT8(new DateTime(0,1,1,13,0)).getMillis - val afternoonEnd = GMT8(new DateTime(0,1,1,15,0)).getMillis + val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis + val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis def inService: Boolean = {
