http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala index dbbe738..aca2736 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,28 +21,29 @@ package io.gearpump.experiments.storm.main import java.io.{File, FileOutputStream, FileWriter} import java.nio.ByteBuffer import java.nio.channels.{Channels, WritableByteChannel} -import java.util.{Map => JMap, HashMap => JHashMap, UUID} +import java.util.{HashMap => JHashMap, Map => JMap, UUID} +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} import akka.actor.ActorSystem +import com.typesafe.config.ConfigValueFactory import backtype.storm.Config import backtype.storm.generated._ import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} -import backtype.storm.utils.TimeCacheMap.ExpiredCallback -import backtype.storm.utils.{TimeCacheMap, Utils} -import com.typesafe.config.ConfigValueFactory -import io.gearpump.cluster.{MasterToAppMaster, UserConfig} +import backtype.storm.utils.Utils +import org.apache.storm.shade.org.json.simple.JSONValue +import org.apache.storm.shade.org.yaml.snakeyaml.Yaml +import org.slf4j.Logger + import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import io.gearpump.cluster.{MasterToAppMaster, UserConfig} import io.gearpump.experiments.storm.topology.GearpumpStormTopology -import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil} +import io.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback +import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper} import io.gearpump.streaming.StreamApplication import io.gearpump.util.{AkkaApp, Constants, LogUtil} -import org.apache.storm.shade.org.json.simple.JSONValue -import org.apache.storm.shade.org.yaml.snakeyaml.Yaml -import org.slf4j.Logger - -import scala.collection.JavaConverters._ -import scala.concurrent.Future object GearpumpNimbus extends AkkaApp with ArgumentsParser { private val THRIFT_PORT = StormUtil.getThriftPort() @@ -50,7 +51,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) override val options: Array[(String, CLIOption[Any])] = Array( - OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml")) + OUTPUT -> CLIOption[String]("<output path for configuration file>", + required = false, defaultValue = Some("app.yaml")) ) override def main(inputAkkaConf: Config, args: Array[String]): Unit = { @@ -72,11 +74,13 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { val thriftServer = createServer(clientContext, stormConf) thriftServer.serve() } - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } - private def createServer(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = { - val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, stormConf)) + private def createServer( + clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = { + val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, + stormConf)) val connectionType = ThriftConnectionType.NIMBUS new ThriftServer(stormConf, processor, connectionType) } @@ -98,43 +102,56 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { } } - import Constants._ + import io.gearpump.util.Constants._ private def updateClientConfig(config: Config): Config = { val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" - val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) - val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) + val appClassPath = s"$storm${File.pathSeparator}" + + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) + val executorClassPath = s"$storm${File.pathSeparator}" + + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) - val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) - .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath)) + val updated = config + .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) + .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, + ConfigValueFactory.fromAnyRef(executorClassPath)) if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { - val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) + val serializerConfig = ConfigValueFactory.fromAnyRef( + config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) } else { updated } } - } -class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface { +class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) + extends Nimbus.Iface { + import io.gearpump.experiments.storm.main.GearpumpNimbus._ private var applications = Map.empty[String, Int] private var topologies = Map.empty[String, TopologyData] - private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get - private val expiredCallback = new ExpiredCallback[String, WritableByteChannel] { + private val expireSeconds = StormUtil.getInt(stormConf, + Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get + private val expiredCallback = new Callback[String, WritableByteChannel] { override def expire(k: String, v: WritableByteChannel): Unit = { v.close() } } - private val fileCacheMap = new TimeCacheMap[String, WritableByteChannel](expireSeconds, expiredCallback) - - override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = { - submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE)) + private val fileCacheMap = new TimeCacheMapWrapper[String, WritableByteChannel](expireSeconds, + expiredCallback) + + override def submitTopology( + name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology) + : Unit = { + submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, + new SubmitOptions(TopologyInitialStatus.ACTIVE)) } - override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { + override def submitTopologyWithOpts( + name: String, uploadedJarLocation: String, + jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { LOG.info(s"Submitted topology $name") implicit val system = clientContext.system val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) @@ -142,8 +159,8 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1) val processorGraph = GraphBuilder.build(gearpumpStormTopology) val config = UserConfig.empty - .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) - .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) + .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) + .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) val app = StreamApplication(name, processorGraph, config) LOG.info(s"jar file uploaded to $uploadedJarLocation") val appId = clientContext.submit(app, uploadedJarLocation, workerNum) @@ -167,13 +184,13 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe } override def getTopology(name: String): StormTopology = { - updateApps + updateApps() topologies.getOrElse(name, throw new RuntimeException(s"topology $name not found")).topology } override def getTopologyConf(name: String): String = { - updateApps + updateApps() JSONValue.toJSONString(topologies.getOrElse(name, throw new RuntimeException(s"topology $name not found")).config) } @@ -210,7 +227,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe } override def getClusterInfo: ClusterSummary = { - updateApps + updateApps() val topologySummaryList = topologies.map { case (name, _) => new TopologySummary(name, name, 0, 0, 0, 0, "") }.toSeq @@ -224,7 +241,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { throw new UnsupportedOperationException } - override def activate(name: String): Unit = { + override def activate(name: String): Unit = { throw new UnsupportedOperationException } @@ -250,7 +267,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe throw new UnsupportedOperationException } - private def updateApps: Unit = { + private def updateApps(): Unit = { clientContext.listApps.appMasters.foreach { app => val name = app.appName if (applications.contains(name)) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala index d4c0e8a..fbdd579 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,6 +20,7 @@ package io.gearpump.experiments.storm.main import backtype.storm.Config import backtype.storm.utils.Utils + import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} import io.gearpump.util.Constants._ import io.gearpump.util.{AkkaApp, LogUtil, Util} @@ -29,14 +30,15 @@ object GearpumpStormClient extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( "jar" -> CLIOption[String]("<storm jar>", required = true), "config" -> CLIOption[Int]("<storm config file>", required = true), - "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false))) + "verbose" -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false))) override def main(inputAkkaConf: Config, args: Array[String]): Unit = { val config = parse(args) val verbose = config.getBoolean("verbose") if (verbose) { - LogUtil.verboseLogToConsole + LogUtil.verboseLogToConsole() } val jar = config.getString("jar") @@ -54,11 +56,12 @@ object GearpumpStormClient extends AkkaApp with ArgumentsParser { val classPath = Array(s"$gearpumpHome/lib/*", s"$gearpumpHome/lib/storm/*", jar) val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) - // wait till the process exit + // Waits till the process exit val exit = process.exitValue() if (exit != 0) { - throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}") + throw new Exception(s"failed to submit jar, exit code $exit, " + + s"error summary: ${process.logger.error}") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala index 04ccb91..e47e11d 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,25 +20,27 @@ package io.gearpump.experiments.storm.partitioner import io.gearpump.Message import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.experiments.storm.util.StormOutputCollector -import io.gearpump.partitioner.{Partitioner, MulticastPartitioner} +import io.gearpump.partitioner.{MulticastPartitioner, Partitioner} /** - * this is a partitioner bound to a target Storm component - * partitioning is already done in [[StormOutputCollector]] and - * kept in "targetPartitions" of [[GearpumpTuple]] + * Partitioner bound to a target Storm component + * + * Partitioning is already done in [[io.gearpump.experiments.storm.util.StormOutputCollector]] and + * kept in "targetPartitions" of [[io.gearpump.experiments.storm.topology.GearpumpTuple]] * the partitioner just returns the partitions of the target * * In Gearpump, a message is sent from a task to all the subscribers. * In Storm, however, a message is sent to one or more of the subscribers. - * Hence, we have to do the partitioning in [[StormOutputCollector]] till the Storm way - * is supported in Gearpump + * Hence, we have to do the partitioning in + * [[io.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm way is supported + * in Gearpump * * @param target target storm component id */ private[storm] class StormPartitioner(target: String) extends MulticastPartitioner { - override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int) + : Array[Int] = { val stormTuple = msg.msg.asInstanceOf[GearpumpTuple] stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala index 3f1700f..5920bf8 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,9 +22,10 @@ import java.util.{Collection => JCollection, List => JList} import backtype.storm.task.IOutputCollector import backtype.storm.tuple.Tuple + import io.gearpump.experiments.storm.topology.TimedTuple -import io.gearpump.experiments.storm.util.StormOutputCollector import io.gearpump.experiments.storm.util.StormConstants._ +import io.gearpump.experiments.storm.util.StormOutputCollector import io.gearpump.streaming.task.ReportCheckpointClock /** @@ -35,11 +36,13 @@ private[storm] class StormBoltOutputCollector(collector: StormOutputCollector, private var reportTime = 0L private var maxAckTime = 0L - override def emit(streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): JList[Integer] = { + override def emit( + streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): JList[Integer] = { collector.emit(streamId, tuple) } - override def emitDirect(taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): Unit = { + override def emitDirect( + taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): Unit = { collector.emitDirect(taskId, streamId, tuple) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala index b7dc3aa..67548b3 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,16 +19,13 @@ package io.gearpump.experiments.storm.processor import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt import io.gearpump.experiments.storm.util._ import io.gearpump.streaming.task._ -import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.concurrent.duration.Duration object StormProcessor { private[storm] val TICK = Message("tick") @@ -38,11 +35,11 @@ object StormProcessor { * this is runtime container for Storm bolt */ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt, - taskContext: TaskContext, conf: UserConfig) + taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import io.gearpump.experiments.storm.processor.StormProcessor._ - def this(taskContext: TaskContext, conf:UserConfig) = { + def this(taskContext: TaskContext, conf: UserConfig) = { this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system) .asInstanceOf[GearpumpBolt], taskContext, conf) } @@ -67,6 +64,8 @@ private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt, } private def scheduleTick(freq: Int): Unit = { - taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)){ self ! TICK } + taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) { + self ! TICK + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala index 787326b..39882c4 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,16 +19,16 @@ package io.gearpump.experiments.storm.producer import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration import akka.actor.Actor.Receive + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout import io.gearpump.experiments.storm.util._ import io.gearpump.streaming.task._ -import scala.concurrent.duration.Duration - object StormProducer { private[storm] val TIMEOUT = Message("timeout") } @@ -37,7 +37,7 @@ object StormProducer { * this is runtime container for Storm spout */ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout, - taskContext: TaskContext, conf: UserConfig) + taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import io.gearpump.experiments.storm.producer.StormProducer._ @@ -75,15 +75,17 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout, optClock.foreach { clock => gearpumpSpout.checkpoint(clock) } - getCheckpointClock + getCheckpointClock() } - def getCheckpointClock: Unit = { + def getCheckpointClock(): Unit = { taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS, TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock) } private def scheduleTimeout(timeout: Long): Unit = { - taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)){ self ! TIMEOUT } + taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) { + self ! TIMEOUT + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala index c60a962..83532dc 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ package io.gearpump.experiments.storm.producer import java.util.{List => JList} import backtype.storm.spout.{ISpout, ISpoutOutputCollector} + import io.gearpump.TimeStamp import io.gearpump.experiments.storm.util.StormOutputCollector @@ -30,7 +31,8 @@ case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeSta * this is used by Storm Spout to emit messages */ private[storm] class StormSpoutOutputCollector( - collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean) extends ISpoutOutputCollector { + collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean) + extends ISpoutOutputCollector { private var checkpointClock = 0L private var pendingMessage: Option[PendingMessage] = None @@ -48,21 +50,21 @@ private[storm] class StormSpoutOutputCollector( throw throwable } - override def emitDirect(taskId: Int, streamId: String, values: JList[AnyRef], messageId: Object): Unit = { + override def emitDirect(taskId: Int, streamId: String, values: JList[AnyRef], messageId: Object) + : Unit = { val curTime = System.currentTimeMillis() collector.setTimestamp(curTime) collector.emitDirect(taskId, streamId, values) setPendingOrAck(messageId, curTime, curTime) } - def ackPendingMessage(checkpointClock: TimeStamp): Unit = { this.checkpointClock = checkpointClock nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) => if (messageTime <= this.checkpointClock) { pendingMessage.foreach { case PendingMessage(id, _, _) => spout.ack(id) - reset + reset() } } } @@ -72,17 +74,18 @@ private[storm] class StormSpoutOutputCollector( pendingMessage.foreach { case PendingMessage(id, _, startTime) => if (System.currentTimeMillis() - startTime >= timeoutMillis) { spout.fail(id) - reset + reset() } } } - private def reset: Unit = { + private def reset(): Unit = { pendingMessage = nextPendingMessage nextPendingMessage = None } - private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp): Unit = { + private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp) + : Unit = { if (ackEnabled) { val newPendingMessage = PendingMessage(messageId, messageTime, startTime) pendingMessage match { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index 32d655d..42dce5f 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,8 +19,11 @@ package io.gearpump.experiments.storm.topology import java.io.{File, FileOutputStream, IOException} +import java.util import java.util.jar.JarFile import java.util.{HashMap => JHashMap, List => JList, Map => JMap} +import scala.collection.JavaConverters._ +import scala.concurrent.{Await, Future} import akka.actor.ActorRef import akka.pattern.ask @@ -32,6 +35,9 @@ import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, Topo import backtype.storm.tuple.{Fields, Tuple, TupleImpl} import backtype.storm.utils.Utils import clojure.lang.Atom +import org.apache.commons.io.{FileUtils, IOUtils} +import org.slf4j.Logger + import io.gearpump.experiments.storm.processor.StormBoltOutputCollector import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector import io.gearpump.experiments.storm.util.StormConstants._ @@ -41,12 +47,6 @@ import io.gearpump.streaming.DAG import io.gearpump.streaming.task._ import io.gearpump.util.{Constants, LogUtil} import io.gearpump.{Message, TimeStamp} -import org.apache.commons.io.{FileUtils, IOUtils} -import org.slf4j.Logger - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.concurrent.{Await, Future} /** * subclass wraps Storm Spout and Bolt, and their lifecycles @@ -68,7 +68,7 @@ trait GearpumpStormComponent { /** * invoked at Task.onStop */ - def stop: Unit = {} + def stop(): Unit = {} } object GearpumpStormComponent { @@ -76,9 +76,10 @@ object GearpumpStormComponent { object GearpumpSpout { def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef], - spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = { + spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = { val componentCommon = spoutSpec.get_common() - val normalizedConfig = normalizeConfig(config.toMap, componentCommon) + val scalaMap = config.asScala.toMap // Convert to scala immutable map + val normalizedConfig = normalizeConfig(scalaMap, componentCommon) val getTopologyContext = (dag: DAG, taskId: TaskId) => { val stormTaskId = gearpumpTaskIdToStorm(taskId) buildTopologyContext(dag, topology, normalizedConfig, stormTaskId) @@ -122,7 +123,6 @@ object GearpumpStormComponent { private var collector: StormSpoutOutputCollector = null - override def start(startTime: StartTime): Unit = { import taskContext.{appMaster, taskId} @@ -161,8 +161,9 @@ object GearpumpStormComponent { object GearpumpBolt { def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef], - boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = { - val normalizedConfig = normalizeConfig(config.toMap, boltSpec.get_common()) + boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = { + val configAsScalaMap = config.asScala.toMap // Convert to scala immutable map + val normalizedConfig = normalizeConfig(configAsScalaMap, boltSpec.get_common()) val getTopologyContext = (dag: DAG, taskId: TaskId) => { val stormTaskId = gearpumpTaskIdToStorm(taskId) buildTopologyContext(dag, topology, normalizedConfig, stormTaskId) @@ -174,8 +175,10 @@ object GearpumpStormComponent { StormOutputCollector(taskContext, topologyContext) } val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) => { - new TupleImpl(topologyContext, List(freq.asInstanceOf[java.lang.Integer]), - SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null) + + val values = new util.ArrayList[Object] // To be compatible with Java interface + values.add(freq.asInstanceOf[java.lang.Integer]) + new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null) } GearpumpBolt( normalizedConfig, @@ -218,7 +221,8 @@ object GearpumpStormComponent { override def next(message: Message): Unit = { val timestamp = message.timestamp collector.setTimestamp(timestamp) - bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, timestamp)) + bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, + timestamp)) } def getTickFrequency: Option[Int] = { @@ -244,9 +248,9 @@ object GearpumpStormComponent { * @param componentCommon common component parts */ private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef], - componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = { + componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = { val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - config.putAll(stormConfig) + config.putAll(stormConfig.asJava) val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf()) Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID)) .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _)) @@ -261,42 +265,63 @@ object GearpumpStormComponent { Await.result(dagFuture, timeout.duration) } - private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _]): GeneralTopologyContext = { + private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _]) + : GeneralTopologyContext = { val taskToComponent = getTaskToComponent(dag) - val componentToSortedTasks: JMap[String, JList[Integer]] = getComponentToSortedTasks(taskToComponent) - val componentToStreamFields: JMap[String, JMap[String, Fields]] = getComponentToStreamFields(topology) - new GeneralTopologyContext(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamFields, null) + val componentToSortedTasks: JMap[String, JList[Integer]] = + getComponentToSortedTasks(taskToComponent) + val componentToStreamFields: JMap[String, JMap[String, Fields]] = + getComponentToStreamFields(topology) + new GeneralTopologyContext( + topology, stormConf, taskToComponent.asJava, componentToSortedTasks, + componentToStreamFields, null) } - private def buildTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: Integer): TopologyContext = { + private def buildTopologyContext( + dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: Integer) + : TopologyContext = { val taskToComponent = getTaskToComponent(dag) - val componentToSortedTasks: JMap[String, JList[Integer]] = getComponentToSortedTasks(taskToComponent) - val componentToStreamFields: JMap[String, JMap[String, Fields]] = getComponentToStreamFields(topology) + val componentToSortedTasks: JMap[String, JList[Integer]] = + getComponentToSortedTasks(taskToComponent) + val componentToStreamFields: JMap[String, JMap[String, Fields]] = + getComponentToStreamFields(topology) val codeDir = mkCodeDir val pidDir = mkPidDir - new TopologyContext(topology, stormConf, taskToComponent, componentToSortedTasks, - componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, null, null, new JHashMap[String, AnyRef], - new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]], new Atom(false)) + new TopologyContext(topology, stormConf, taskToComponent.asJava, componentToSortedTasks, + componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, null, null, + new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]], + new Atom(false)) } - private def getComponentToStreamFields(topology: StormTopology): JMap[String, JMap[String, Fields]] = { - val spouts = topology.get_spouts() - val bolts = topology.get_bolts() + private def getComponentToStreamFields(topology: StormTopology) + : JMap[String, JMap[String, Fields]] = { + val spouts = topology.get_spouts().asScala + val bolts = topology.get_bolts().asScala - (spouts.map { case (id, component) => id -> getComponentToFields(component.get_common()) } ++ - bolts.map { case (id, component) => id -> getComponentToFields(component.get_common())} ++ - Map(SYSTEM_COMPONENT_ID -> Map(SYSTEM_TICK_STREAM_ID -> new Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava) - ).toMap.asJava + val spoutFields = spouts.map { + case (id, component) => id -> getComponentToFields(component.get_common()) + } + + val boltFields = bolts.map { + case (id, component) => id -> getComponentToFields(component.get_common()) + } + + val systemFields = Map(SYSTEM_COMPONENT_ID -> + Map(SYSTEM_TICK_STREAM_ID -> new Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava) + + (spoutFields ++ boltFields ++ systemFields).asJava } private def getComponentToFields(common: ComponentCommon): JMap[String, Fields] = { - common.get_streams.map { case (sid, stream) => + val streams = common.get_streams.asScala + streams.map { case (sid, stream) => sid -> new Fields(stream.get_output_fields()) - }.toMap.asJava + }.asJava } - private def getComponentToSortedTasks(taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = { + private def getComponentToSortedTasks( + taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = { taskToComponent.groupBy(_._2).map { case (component, map) => val sortedTasks = map.keys.toList.sorted.asJava component -> sortedTasks @@ -307,12 +332,13 @@ object GearpumpStormComponent { val taskToComponent = dag.processors.flatMap { case (processorId, processorDescription) => val parallelism = processorDescription.parallelism val component = processorDescription.taskConf.getString(STORM_COMPONENT).get - (0 until parallelism).map(index => gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component) + (0 until parallelism).map(index => + gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component) } taskToComponent } - // a workaround to support storm ShellBolt + // Workarounds to support storm ShellBolt private def mkPidDir: String = { val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid" try { @@ -333,7 +359,7 @@ object GearpumpStormComponent { FileUtils.forceMkdir(new File(destDir)) val jar = new JarFile(jarPath) - val enumEntries = jar.entries() + val enumEntries = jar.entries().asScala enumEntries.foreach { entry => val file = new File(destDir + File.separator + entry.getName) if (!entry.isDirectory) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala index b88a200..5c3fc3e 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -25,6 +25,8 @@ import akka.actor.ActorSystem import backtype.storm.Config import backtype.storm.generated._ import backtype.storm.utils.{ThriftTopologyUtils, Utils} +import org.slf4j.Logger + import io.gearpump.cluster.UserConfig import io.gearpump.experiments.storm.processor.StormProcessor import io.gearpump.experiments.storm.producer.StormProducer @@ -34,9 +36,11 @@ import io.gearpump.experiments.storm.util.StormUtil._ import io.gearpump.streaming.Processor import io.gearpump.streaming.task.Task import io.gearpump.util.LogUtil -import org.slf4j.Logger +// TODO: Refactor this file, we should disable using of JavaConversions +// scalastyle:off javaconversions import scala.collection.JavaConversions._ +// scalastyle:on javaconversions object GearpumpStormTopology { private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology]) @@ -51,16 +55,14 @@ object GearpumpStormTopology { Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]], parseJsonStringToMap(appConfigInJson) ) - } - } /** * this is a wrapper over Storm topology which - * 1. merges Storm and Gearpump configs - * 2. creates Gearpump processors - * 3. provides interface for Gearpump applications to use Storm topology + * 1. merges Storm and Gearpump configs + * 2. creates Gearpump processors + * 3. provides interface for Gearpump applications to use Storm topology * * an implicit ActorSystem is required to create Gearpump processors * @param name topology name @@ -78,14 +80,16 @@ private[storm] class GearpumpStormTopology( private val bolts = topology.get_bolts() private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts)) private val spoutProcessors = spouts.map { case (id, spout) => - id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap + id -> spoutToProcessor(id, spout, stormConfig.toMap) + }.toMap private val boltProcessors = bolts.map { case (id, bolt) => - id -> boltToProcessor(id, bolt, stormConfig.toMap) }.toMap + id -> boltToProcessor(id, bolt, stormConfig.toMap) + }.toMap private val allProcessors = spoutProcessors ++ boltProcessors /** * @return merged Storm config with priority - * defaults.yaml < custom file config < application config < component config + * defaults.yaml < custom file config < application config < component config */ def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig @@ -131,7 +135,7 @@ private[storm] class GearpumpStormTopology( stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = { val componentCommon = spoutSpec.get_common() val taskConf = UserConfig.empty - .withString(STORM_COMPONENT, spoutId) + .withString(STORM_COMPONENT, spoutId) val parallelism = getParallelism(stormConfig, componentCommon) Processor[StormProducer](parallelism, spoutId, taskConf) } @@ -148,8 +152,8 @@ private[storm] class GearpumpStormTopology( stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = { val componentCommon = boltSpec.get_common() val taskConf = UserConfig.empty - .withString(STORM_COMPONENT, boltId) - .withBoolean("state.checkpoint.enable", StormUtil.ackEnabled(stormConfig)) + .withString(STORM_COMPONENT, boltId) + .withBoolean("state.checkpoint.enable", StormUtil.ackEnabled(stormConfig)) val parallelism = getParallelism(stormConfig, componentCommon) Processor[StormProcessor](parallelism, boltId, taskConf) } @@ -157,7 +161,8 @@ private[storm] class GearpumpStormTopology( /** * @return target components and streams */ - private def getTargets(componentId: String, topology: StormTopology): Map[String, Map[String, Grouping]] = { + private def getTargets(componentId: String, topology: StormTopology) + : Map[String, Map[String, Grouping]] = { val componentIds = ThriftTopologyUtils.getComponentIds(topology) componentIds.flatMap { otherComponentId => getInputs(otherComponentId, topology).toList.map(otherComponentId -> _) @@ -178,16 +183,18 @@ private[storm] class GearpumpStormTopology( /** * @return input stream and grouping for a Storm component */ - private def getInputs(componentId: String, topology: StormTopology): JMap[GlobalStreamId, Grouping] = { + private def getInputs(componentId: String, topology: StormTopology) + : JMap[GlobalStreamId, Grouping] = { ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs } /** * get Storm component parallelism according to the following rule, - * 1. use "topology.tasks" if defined; otherwise use parallelism_hint - * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined - * 3. component config overrides system config - * @param stormConfig system configs without merging "topology.tasks" and "topology.max.task.parallelism" of component + * 1. use "topology.tasks" if defined; otherwise use parallelism_hint + * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined + * 3. component config overrides system config + * @param stormConfig System configs without merging "topology.tasks" and + * "topology.max.task.parallelism" of component * @return number of task instances for a component */ private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: ComponentCommon): Int = { @@ -207,7 +214,7 @@ private[storm] class GearpumpStormTopology( } private def getComponentConfigs(spouts: JMap[String, SpoutSpec], - bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = { + bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = { spouts.map { case (id, spoutSpec) => parseJsonStringToMap(spoutSpec.get_common().get_json_conf()) } ++ bolts.map { case (id, boltSpec) => @@ -222,7 +229,7 @@ private[storm] class GearpumpStormTopology( * @return the two configs merged from all the component configs and existing configs */ private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { + allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig)) mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig)) @@ -232,10 +239,11 @@ private[storm] class GearpumpStormTopology( /** * @param componentConfigs list of component configs * @param allConfig existing configs without merging component configs - * @return a merged config with a list of distinct kryo decorators from component and existing configs + * @return a merged config with a list of distinct kryo decorators from component and + * existing configs */ private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { + allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1) val key = Config.TOPOLOGY_KRYO_DECORATORS val configs = getConfigValues(componentConfigs, allConfig, key) @@ -250,7 +258,6 @@ private[storm] class GearpumpStormTopology( accum case illegal => throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal") - } if (distincts.nonEmpty) { val decorators: JList[String] = new JArrayList(distincts.size) @@ -266,7 +273,7 @@ private[storm] class GearpumpStormTopology( * @return a merged config with component config overriding existing configs */ private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { + allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1) val key = Config.TOPOLOGY_KRYO_REGISTER val configs = getConfigValues(componentConfigs, allConfig, key) @@ -275,7 +282,7 @@ private[storm] class GearpumpStormTopology( accum ++ config.map { case m: JMap[_, _] => m.map { - case (k: String, v: String) => k ->v + case (k: String, v: String) => k -> v case illegal => throw new IllegalArgumentException( s"each element of $key must be a String or a Map of Strings; actually $illegal") @@ -283,7 +290,8 @@ private[storm] class GearpumpStormTopology( case s: String => Map(s -> null) case illegal => - throw new IllegalArgumentException(s"each element of $key must be a String or a Map of Strings; actually $illegal") + throw new IllegalArgumentException(s"each element of $key must be a String or " + + s"a Map of Strings; actually $illegal") }.reduce(_ ++ _) case (accum, null) => accum @@ -294,7 +302,7 @@ private[storm] class GearpumpStormTopology( if (merged.nonEmpty) { val registers: JMap[String, String] = new JHashMap[String, String](merged.size) registers.putAll(merged) - mergedConfig.put(key, registers) + mergedConfig.put(key, registers) } mergedConfig } @@ -306,7 +314,7 @@ private[storm] class GearpumpStormTopology( * @return a list of values for a config from both component configs and existing configs */ private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = { + allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = { componentConfigs.map(config => config.get(key)) ++ allConfig.get(key).toList } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala index 7662f36..ee54add 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,8 +22,8 @@ import java.util.{List => JList} import backtype.storm.task.GeneralTopologyContext import backtype.storm.tuple.{Tuple, TupleImpl} -import io.gearpump.TimeStamp +import io.gearpump.TimeStamp /** * this carries Storm tuple values in the Gearpump world @@ -36,9 +36,9 @@ private[storm] class GearpumpTuple( val sourceStreamId: String, @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable { /** - * creates a Storm [[Tuple]] to be passed to a Storm component + * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component * this is needed for each incoming message - * because we cannot get [[GeneralTopologyContext]] at deserialization + * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization * @param topologyContext topology context used for all tasks * @return a Tuple */ @@ -46,15 +46,14 @@ private[storm] class GearpumpTuple( TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp) } - def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple] override def equals(other: Any): Boolean = other match { case that: GearpumpTuple => (that canEqual this) && - values == that.values && - sourceTaskId == that.sourceTaskId && - sourceStreamId == that.sourceStreamId + values == that.values && + sourceTaskId == that.sourceTaskId && + sourceStreamId == that.sourceStreamId case _ => false } @@ -68,4 +67,3 @@ case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyR sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp) extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null) - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala index e9f03f3..041a90c 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,6 @@ package io.gearpump.experiments.storm.util - import io.gearpump.experiments.storm.partitioner.StormPartitioner import io.gearpump.experiments.storm.topology.GearpumpStormTopology import io.gearpump.partitioner.Partitioner http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala index d727b7a..91277cd 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,18 +19,15 @@ package io.gearpump.experiments.storm.util import java.util.{List => JList} +import scala.util.Random import backtype.storm.generated.GlobalStreamId import backtype.storm.grouping.CustomStreamGrouping import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields -import scala.collection.JavaConversions._ -import scala.util.Random - /** - * Grouper is identical to that in storm but return gearpump - * partitions for storm tuple values + * Grouper is identical to that in storm but return gearpump partitions for storm tuple values */ sealed trait Grouper { /** @@ -50,6 +47,7 @@ class GlobalGrouper extends Grouper { /** * NoneGrouper randomly returns partition + * * @param numTasks number of target tasks */ class NoneGrouper(numTasks: Int) extends Grouper { @@ -62,8 +60,8 @@ class NoneGrouper(numTasks: Int) extends Grouper { } /** - * ShuffleGrouper shuffles partitions and returns them sequentially, - * and then shuffles again + * ShuffleGrouper shuffles partitions and returns them sequentially, and then shuffles again + * * @param numTasks number of target tasks */ class ShuffleGrouper(numTasks: Int) extends Grouper { @@ -86,6 +84,7 @@ class ShuffleGrouper(numTasks: Int) extends Grouper { /** * FieldsGrouper returns partition based on value of groupFields + * * @param outFields declared output fields of source task * @param groupFields grouping fields of target tasks * @param numTasks number of target tasks @@ -101,6 +100,7 @@ class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) exten /** * AllGrouper returns all partitions + * * @param numTasks number of target tasks */ class AllGrouper(numTasks: Int) extends Grouper { @@ -113,16 +113,30 @@ class AllGrouper(numTasks: Int) extends Grouper { /** * CustomGrouper allows users to specify grouping strategy - * @param grouping see [[CustomStreamGrouping]] + * + * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]] */ class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper { - def prepare(topologyContext: TopologyContext, globalStreamId: GlobalStreamId, targetTasks: JList[Integer]): Unit = { + def prepare( + topologyContext: TopologyContext, globalStreamId: GlobalStreamId, targetTasks: JList[Integer]) + : Unit = { grouping.prepare(topologyContext, globalStreamId, targetTasks) } override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { - grouping.chooseTasks(taskId, values).map(StormUtil.stormTaskIdToGearpump(_).index).toArray + val tasks = grouping.chooseTasks(taskId, values) + val result = new Array[Int](tasks.size()) + + val iter = tasks.iterator() + + var index = 0 + while (iter.hasNext()) { + val value = iter.next() + result(index) = StormUtil.stormTaskIdToGearpump(value).index + index += 1 + } + result } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala index 58e7160..928d07b 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,7 +27,7 @@ object StormConstants { val SYSTEM_TASK_ID: Integer = -1 val SYSTEM_TICK_STREAM_ID = "__tick" - val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds + val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala index c1df63b..74d1b2b 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,21 +19,21 @@ package io.gearpump.experiments.storm.util import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap} +import scala.collection.JavaConverters._ import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject} import backtype.storm.grouping.CustomStreamGrouping import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields import backtype.storm.utils.Utils +import org.slf4j.Logger + import io.gearpump._ import io.gearpump.experiments.storm.topology.GearpumpTuple import io.gearpump.experiments.storm.util.StormUtil._ import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.task.{TaskContext, TaskId} import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.JavaConversions._ object StormOutputCollector { private val LOG: Logger = LogUtil.getLogger(classOf[StormOutputCollector]) @@ -43,18 +43,20 @@ object StormOutputCollector { val stormTaskId = topologyContext.getThisTaskId val componentId = topologyContext.getThisComponentId val taskToComponent = topologyContext.getTaskToComponent - val componentToProcessorId = getComponentToProcessorId(taskToComponent.toMap) + val componentToProcessorId = getComponentToProcessorId(taskToComponent.asScala.toMap) val targets = topologyContext.getTargets(componentId) val streamGroupers: Map[String, Grouper] = - targets.flatMap { case (streamId, targetGrouping) => - targetGrouping.collect { case (target, grouping) if !grouping.is_set_direct() => + targets.asScala.flatMap { case (streamId, targetGrouping) => + targetGrouping.asScala.collect { case (target, grouping) if !grouping.is_set_direct() => streamId -> getGrouper(topologyContext, grouping, componentId, streamId, target) } }.toMap val getTargetPartitionsFn = (streamId: String, values: JList[AnyRef]) => { - getTargetPartitions(stormTaskId, streamId, targets, streamGroupers, componentToProcessorId, values) + getTargetPartitions(stormTaskId, streamId, targets, + streamGroupers, componentToProcessorId, values) } - new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, taskContext, LatestTime) + new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, + taskContext, LatestTime) } /** @@ -66,7 +68,7 @@ object StormOutputCollector { targets: JMap[String, JMap[String, Grouping]], streamGroupers: Map[String, Grouper], componentToProcessorId: Map[String, ProcessorId], - values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) ={ + values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) = { val ret: JList[Integer] = new JArrayList[Integer](targets.size) @annotation.tailrec @@ -85,18 +87,20 @@ object StormOutputCollector { accum } } - val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, Map.empty[String, Array[Int]]) + val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, + Map.empty[String, Array[Int]]) (targetPartitions, ret) } - private def getComponentToProcessorId(taskToComponent: Map[Integer, String]): Map[String, ProcessorId] = { + private def getComponentToProcessorId(taskToComponent: Map[Integer, String]) + : Map[String, ProcessorId] = { taskToComponent.map { case (id, component) => component -> stormTaskIdToGearpump(id).processorId } } private def getGrouper(topologyContext: TopologyContext, grouping: Grouping, - source: String, streamId: String, target: String): Grouper = { + source: String, streamId: String, target: String): Grouper = { val outFields = topologyContext.getComponentOutputFields(source, streamId) val targetTasks = topologyContext.getComponentTasks(target) val targetTaskNum = targetTasks.size @@ -116,7 +120,8 @@ object StormOutputCollector { case Grouping._Fields.ALL => new AllGrouper(targetTaskNum) case Grouping._Fields.CUSTOM_SERIALIZED => - val customGrouping = Utils.javaDeserialize(grouping.get_custom_serialized, classOf[Serializable]).asInstanceOf[CustomStreamGrouping] + val customGrouping = Utils.javaDeserialize(grouping.get_custom_serialized, + classOf[Serializable]).asInstanceOf[CustomStreamGrouping] val grouper = new CustomGrouper(customGrouping) grouper.prepare(topologyContext, globalStreamId, targetTasks) grouper @@ -141,7 +146,7 @@ object StormOutputCollector { private def instantiateJavaObject(javaObject: JavaObject): CustomStreamGrouping = { val className = javaObject.get_full_class_name() - val args = javaObject.get_args_list().map(_.getFieldValue) + val args = javaObject.get_args_list().asScala.map(_.getFieldValue) val customGrouping = Class.forName(className).getConstructor(args.map(_.getClass): _*) .newInstance(args).asInstanceOf[CustomStreamGrouping] customGrouping @@ -149,7 +154,8 @@ object StormOutputCollector { } /** - * this provides common functionality for [[io.gearpump.experiments.storm.producer.StormSpoutOutputCollector]] + * Provides common functionality for + * [[io.gearpump.experiments.storm.producer.StormSpoutOutputCollector]] * and [[io.gearpump.experiments.storm.processor.StormBoltOutputCollector]] */ class StormOutputCollector( @@ -162,14 +168,15 @@ class StormOutputCollector( import io.gearpump.experiments.storm.util.StormOutputCollector._ /** - * this is invoked by a Storm output collector to emit tuple values into a stream. - * We will wrap the values into a message of [[GearpumpTuple]] along with the target partitions - * to tell [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] where to send the message. - * We also return the corresponding target Storm task ids back to the collector + * Emits tuple values into a stream (invoked by a Storm output collector). + * + * wrapS the values into a message of [[GearpumpTuple]] along with the target partitions + * to tell [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] where to send the + * message. We also return the corresponding target Storm task ids back to the collector * * @param streamId Storm stream id * @param values Storm tuple values - * @return target Storm task ids + * @return Target Storm task ids */ def emit(streamId: String, values: JList[AnyRef]): JList[Integer] = { if (targets.containsKey(streamId)) { @@ -183,10 +190,11 @@ class StormOutputCollector( } /** - * this is invoked by Storm output collector to emit tuple values to a specific Storm task. - * we translate the Storm task id into Gearpump TaskId and tell + * Emit tuple values to a specific Storm task (invoked by Storm output collector). + * + * We translate the Storm task id into Gearpump TaskId and tell * [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] through the targetPartitions - * field of [[GearpumpTuple]] + * field of [[io.gearpump.experiments.storm.topology.GearpumpTuple]] * * @param id Storm task id * @param streamId Storm stream id @@ -203,8 +211,7 @@ class StormOutputCollector( } /** - * get timestamp from each incoming Message and - * which will be set into output messages + * set timestamp from each incoming Message if not attached. */ def setTimestamp(timestamp: TimeStamp): Unit = { this.timestamp = timestamp http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala index f306de6..f1c7bab 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,11 +19,13 @@ package io.gearpump.experiments.storm.util import java.lang.{Integer => JInteger} import java.util.{Map => JMap} + import akka.actor.ExtendedActorSystem import backtype.storm.serialization.SerializationFactory import backtype.storm.utils.ListDelegate import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} + import io.gearpump.cluster.UserConfig import io.gearpump.experiments.storm.topology.GearpumpTuple import io.gearpump.experiments.storm.util.StormConstants._ @@ -50,8 +52,9 @@ class StormSerializationFramework extends SerializationFramework { } /** - * serializes / deserializes [[GearpumpTuple]] - * @param kryo created by Storm [[SerializationFactory]] + * serializes / deserializes [[io.gearpump.experiments.storm.topology.GearpumpTuple]] + * + * @param kryo created by Storm [[backtype.storm.serialization.SerializationFactory]] */ class StormSerializer(kryo: Kryo) extends Serializer { // -1 means the max buffer size is 2147483647 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala index 4f4fced..554210c 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,21 +24,22 @@ import java.util.{HashMap => JHashMap, Map => JMap} import akka.actor.ActorSystem import backtype.storm.Config import backtype.storm.generated._ +import org.apache.storm.shade.org.json.simple.JSONValue + import io.gearpump.cluster.UserConfig import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} import io.gearpump.experiments.storm.topology._ import io.gearpump.experiments.storm.util.StormConstants._ import io.gearpump.streaming.task.{TaskContext, TaskId} import io.gearpump.util.Util -import org.apache.storm.shade.org.json.simple.JSONValue object StormUtil { - /** - * convert storm task id to gearpump [[TaskId]] - * the high 16 bit of an Int is TaskId.processorId - * the low 16 bit of an Int is TaskId.index + * Convert storm task id to gearpump [[io.gearpump.streaming.task.TaskId]] + * + * The high 16 bit of an Int is TaskId.processorId + * The low 16 bit of an Int is TaskId.index */ def stormTaskIdToGearpump(id: Integer): TaskId = { val index = id & 0xFFFF @@ -57,11 +58,12 @@ object StormUtil { (processorId << 16) + (index & 0xFFFF) } - /** * @return a configured [[GearpumpStormComponent]] */ - def getGearpumpStormComponent(taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem): GearpumpStormComponent = { + def getGearpumpStormComponent( + taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem) + : GearpumpStormComponent = { val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get val componentId = conf.getString(STORM_COMPONENT).get @@ -77,8 +79,8 @@ object StormUtil { } /** - * parse config in json to map - * return empty map for invalid json string + * Parses config in json to map, returns empty map for invalid json string + * * @param json config in json * @return config in map */ @@ -95,7 +97,8 @@ object StormUtil { def getInt(conf: JMap[_, _], name: String): Option[Int] = { Option(conf.get(name)).map { case number: Number => number.intValue - case invalid => throw new IllegalArgumentException(s"$name must be Java Integer; actual: ${invalid.getClass}") + case invalid => throw new IllegalArgumentException( + s"$name must be Java Integer; actual: ${invalid.getClass}") } } @@ -105,7 +108,8 @@ object StormUtil { def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = { Option(conf.get(name)).map { case b: JBoolean => b.booleanValue() - case invalid => throw new IllegalArgumentException(s"$name must be a Java Boolean; acutal: ${invalid.getClass}") + case invalid => throw new IllegalArgumentException( + s"$name must be a Java Boolean; acutal: ${invalid.getClass}") } } @@ -126,7 +130,7 @@ object StormUtil { } def getThriftPort(): Int = { - Util.findFreePort.getOrElse( + Util.findFreePort().getOrElse( throw new Exception("unable to find free port for thrift server")) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala index 200807c..ed4a6bb 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,13 +19,15 @@ package io.gearpump.experiments.storm.partitioner import java.util.{List => JList} -import io.gearpump.Message -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.partitioner.Partitioner +import scala.collection.JavaConverters._ + import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConverters._ + +import io.gearpump.Message +import io.gearpump.experiments.storm.topology.GearpumpTuple +import io.gearpump.partitioner.Partitioner class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { @@ -38,30 +40,28 @@ class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { sourceTaskId <- idGen sourceStreamId <- Gen.alphaStr } yield (targetPartitions: Map[String, Array[Int]]) => { - new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, targetPartitions) - } + new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, targetPartitions) + } forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) { - (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, components: List[String], partitions: Array[Int]) => + (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, + components: List[String], partitions: Array[Int]) => { val currentPartitionId = id val targetPartitions = components.init.map(c => (c, partitions)).toMap val tuple = tupleFactory(targetPartitions) - targetPartitions.foreach { case (target, ps) => - val partitioner = new StormPartitioner(target) - partitioner.getPartitions(Message(tuple), ps.last + 1, - currentPartitionId) shouldBe ps + targetPartitions.foreach { + case (target, ps) => { + val partitioner = new StormPartitioner(target) + ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, currentPartitionId) + } } val partitionNum = id val nonTarget = components.last val partitioner = new StormPartitioner(nonTarget) - if (targetPartitions.contains(nonTarget)) { - println(targetPartitions) - } + partitioner.getPartitions(Message(tuple), partitionNum, currentPartitionId) shouldBe List(Partitioner.UNKNOWN_PARTITION_ID) - + } } } - - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala index d27a754..64cabd5 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,20 @@ package io.gearpump.experiments.storm.processor import java.util.{List => JList} +import scala.collection.JavaConverters._ import backtype.storm.tuple.Tuple import backtype.storm.utils.Utils -import io.gearpump.experiments.storm.util.StormOutputCollector import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConversions._ +import io.gearpump.experiments.storm.util.StormOutputCollector -class StormBoltOutputCollectorSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { +class StormBoltOutputCollectorSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { property("StormBoltOutputCollector should call StormOutputCollector") { val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000)) @@ -41,8 +42,8 @@ class StormBoltOutputCollectorSpec extends PropSpec with PropertyChecks with Mat val collector = mock[StormOutputCollector] val boltCollector = new StormBoltOutputCollector(collector) val streamId = Utils.DEFAULT_STREAM_ID - boltCollector.emit(streamId, null, values) - verify(collector).emit(streamId, values) + boltCollector.emit(streamId, null, values.asJava) + verify(collector).emit(streamId, values.asJava) } } @@ -50,6 +51,6 @@ class StormBoltOutputCollectorSpec extends PropSpec with PropertyChecks with Mat val collector = mock[StormOutputCollector] val tuple = mock[Tuple] val boltCollector = new StormBoltOutputCollector(collector) - an [Exception] should be thrownBy boltCollector.fail(tuple) + an[Exception] should be thrownBy boltCollector.fail(tuple) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala index 6a74b24..a3a8196 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.experiments.storm.processor +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + import io.gearpump.Message import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} +import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt import io.gearpump.streaming.MockUtil import io.gearpump.streaming.task.StartTime -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{WordSpec, Matchers} class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar { @@ -61,6 +62,5 @@ class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar { verify(gearpumpBolt).tick(freq) } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala index 98ca2d5..8c10afc 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,14 +19,15 @@ package io.gearpump.experiments.storm.producer import akka.testkit.TestProbe +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + import io.gearpump.Message import io.gearpump.cluster.UserConfig import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout import io.gearpump.streaming.MockUtil import io.gearpump.streaming.task.StartTime -import org.scalatest.mock.MockitoSugar -import org.scalatest.{WordSpec, Matchers} -import org.mockito.Mockito._ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala index d202446..e638da9 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,20 @@ package io.gearpump.experiments.storm.producer import java.util.{List => JList} +import scala.collection.JavaConverters._ import backtype.storm.spout.ISpout import backtype.storm.utils.Utils -import io.gearpump.experiments.storm.util.StormOutputCollector import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConversions._ +import io.gearpump.experiments.storm.util.StormOutputCollector -class StormSpoutOutputCollectorSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { +class StormSpoutOutputCollectorSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { property("StormSpoutOutputCollector should call StormOutputCollector") { val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000)) @@ -42,9 +43,8 @@ class StormSpoutOutputCollectorSpec extends PropSpec with PropertyChecks with Ma val spout = mock[ISpout] val streamId = Utils.DEFAULT_STREAM_ID val spoutCollector = new StormSpoutOutputCollector(collector, spout, false) - spoutCollector.emit(streamId, values, null) - verify(collector).emit(streamId, values) + spoutCollector.emit(streamId, values.asJava, null) + verify(collector).emit(streamId, values.asJava) } } - }
