http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala deleted file mode 100644 index 554210c..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import java.lang.{Boolean => JBoolean} -import java.util.{HashMap => JHashMap, Map => JMap} - -import akka.actor.ActorSystem -import backtype.storm.Config -import backtype.storm.generated._ -import org.apache.storm.shade.org.json.simple.JSONValue - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} -import io.gearpump.experiments.storm.topology._ -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.streaming.task.{TaskContext, TaskId} -import io.gearpump.util.Util - -object StormUtil { - - /** - * Convert storm task id to gearpump [[io.gearpump.streaming.task.TaskId]] - * - * The high 16 bit of an Int is TaskId.processorId - * The low 16 bit of an Int is TaskId.index - */ - def stormTaskIdToGearpump(id: Integer): TaskId = { - val index = id & 0xFFFF - val processorId = id >> 16 - TaskId(processorId, index) - } - - /** - * convert gearpump [[TaskId]] to storm task id - * TaskId.processorId is the high 16 bit of an Int - * TaskId.index is the low 16 bit of an Int - */ - def gearpumpTaskIdToStorm(taskId: TaskId): Integer = { - val index = taskId.index - val processorId = taskId.processorId - (processorId << 16) + (index & 0xFFFF) - } - - /** - * @return a configured [[GearpumpStormComponent]] - */ - def getGearpumpStormComponent( - taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem) - : GearpumpStormComponent = { - val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get - val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get - val componentId = conf.getString(STORM_COMPONENT).get - val spouts = topology.get_spouts - val bolts = topology.get_bolts - if (spouts.containsKey(componentId)) { - GearpumpSpout(topology, stormConfig, spouts.get(componentId), taskContext) - } else if (bolts.containsKey(componentId)) { - GearpumpBolt(topology, stormConfig, bolts.get(componentId), taskContext) - } else { - throw new Exception(s"storm component $componentId not found") - } - } - - /** - * Parses config in json to map, returns empty map for invalid json string - * - * @param json config in json - * @return config in map - */ - def parseJsonStringToMap(json: String): JMap[AnyRef, AnyRef] = { - Option(json).flatMap(json => JSONValue.parse(json) match { - case m: JMap[_, _] => Option(m.asInstanceOf[JMap[AnyRef, AnyRef]]) - case _ => None - }).getOrElse(new JHashMap[AnyRef, AnyRef]) - } - - /** - * get Int value of the config name - */ - def getInt(conf: JMap[_, _], name: String): Option[Int] = { - Option(conf.get(name)).map { - case number: Number => number.intValue - case invalid => throw new IllegalArgumentException( - s"$name must be Java Integer; actual: ${invalid.getClass}") - } - } - - /** - * get Boolean value of the config name - */ - def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = { - Option(conf.get(name)).map { - case b: JBoolean => b.booleanValue() - case invalid => throw new IllegalArgumentException( - s"$name must be a Java Boolean; acutal: ${invalid.getClass}") - } - } - - /** - * clojure mod ported from Storm - * see https://clojuredocs.org/clojure.core/mod - */ - def mod(num: Int, div: Int): Int = { - (num % div + div) % div - } - - def ackEnabled(config: JMap[AnyRef, AnyRef]): Boolean = { - if (config.containsKey(Config.TOPOLOGY_ACKER_EXECUTORS)) { - getInt(config, Config.TOPOLOGY_ACKER_EXECUTORS).map(_ != 0).getOrElse(true) - } else { - false - } - } - - def getThriftPort(): Int = { - Util.findFreePort().getOrElse( - throw new Exception("unable to find free port for thrift server")) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala new file mode 100644 index 0000000..51760ba --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/StormRunner.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm + +import org.apache.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient} +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +object StormRunner { + private val LOG: Logger = LogUtil.getLogger(getClass) + + private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient) + + private def usage(): Unit = { + val keys = commands.keys.toList.sorted + // scalastyle:off println + Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println + } + + private def executeCommand(command: String, commandArgs: Array[String]): Unit = { + if (!commands.contains(command)) { + usage() + } else { + commands(command).main(commandArgs) + } + } + + def main(args: Array[String]): Unit = { + if (args.length == 0) { + usage() + } else { + val command = args(0) + val commandArgs = args.drop(1) + executeCommand(command, commandArgs) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala new file mode 100644 index 0000000..544a4eb --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.main + +import java.io.{File, FileOutputStream, FileWriter} +import java.nio.ByteBuffer +import java.nio.channels.{Channels, WritableByteChannel} +import java.util.{HashMap => JHashMap, Map => JMap, UUID} +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +import akka.actor.ActorSystem +import com.typesafe.config.ConfigValueFactory +import backtype.storm.Config +import backtype.storm.generated._ +import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} +import backtype.storm.utils.Utils +import org.apache.storm.shade.org.json.simple.JSONValue +import org.apache.storm.shade.org.yaml.snakeyaml.Yaml +import org.slf4j.Logger + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig} +import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology +import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback +import org.apache.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper} +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil} + +object GearpumpNimbus extends AkkaApp with ArgumentsParser { + private val THRIFT_PORT = StormUtil.getThriftPort() + private val OUTPUT = "output" + private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) + + override val options: Array[(String, CLIOption[Any])] = Array( + OUTPUT -> CLIOption[String]("<output path for configuration file>", + required = false, defaultValue = Some("app.yaml")) + ) + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + val parsed = parse(args) + val output = parsed.getString(OUTPUT) + val akkaConf = updateClientConfig(inputAkkaConf) + val system = ActorSystem("storm", akkaConf) + + val clientContext = new ClientContext(akkaConf, system, null) + val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] + val thriftConf: JMap[AnyRef, AnyRef] = Map( + Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME), + Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]] + updateStormConfig(thriftConf, output) + stormConf.putAll(thriftConf) + + import scala.concurrent.ExecutionContext.Implicits.global + Future { + val thriftServer = createServer(clientContext, stormConf) + thriftServer.serve() + } + Await.result(system.whenTerminated, Duration.Inf) + } + + private def createServer( + clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = { + val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, + stormConf)) + val connectionType = ThriftConnectionType.NIMBUS + new ThriftServer(stormConf, processor, connectionType) + } + + private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = { + val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]] + updatedConfig.putAll(outputConfig) + updatedConfig.putAll(thriftConfig) + val yaml = new Yaml + val serialized = yaml.dumpAsMap(updatedConfig) + val writer = new FileWriter(new File(output)) + try { + writer.write(serialized) + } catch { + case e: Exception => throw e + } finally { + writer.close() + } + } + + import org.apache.gearpump.util.Constants._ + private def updateClientConfig(config: Config): Config = { + val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" + val appClassPath = s"$storm${File.pathSeparator}" + + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) + val executorClassPath = s"$storm${File.pathSeparator}" + + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) + + val updated = config + .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) + .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, + ConfigValueFactory.fromAnyRef(executorClassPath)) + + if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { + val serializerConfig = ConfigValueFactory.fromAnyRef( + config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) + updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) + } else { + updated + } + } +} + +class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) + extends Nimbus.Iface { + + import org.apache.gearpump.experiments.storm.main.GearpumpNimbus._ + + private var applications = Map.empty[String, Int] + private var topologies = Map.empty[String, TopologyData] + private val expireSeconds = StormUtil.getInt(stormConf, + Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get + private val expiredCallback = new Callback[String, WritableByteChannel] { + override def expire(k: String, v: WritableByteChannel): Unit = { + v.close() + } + } + private val fileCacheMap = new TimeCacheMapWrapper[String, WritableByteChannel](expireSeconds, + expiredCallback) + + override def submitTopology( + name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology) + : Unit = { + submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, + new SubmitOptions(TopologyInitialStatus.ACTIVE)) + } + + override def submitTopologyWithOpts( + name: String, uploadedJarLocation: String, + jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { + LOG.info(s"Submitted topology $name") + implicit val system = clientContext.system + val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) + val stormConfig = gearpumpStormTopology.getStormConfig + val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1) + val processorGraph = GraphBuilder.build(gearpumpStormTopology) + val config = UserConfig.empty + .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) + .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) + val app = StreamApplication(name, processorGraph, config) + LOG.info(s"jar file uploaded to $uploadedJarLocation") + val appId = clientContext.submit(app, uploadedJarLocation, workerNum) + applications += name -> appId + topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation) + LOG.info(s"Storm Application $appId submitted") + } + + override def killTopologyWithOpts(name: String, options: KillOptions): Unit = { + if (applications.contains(name)) { + clientContext.shutdown(applications(name)) + removeTopology(name) + LOG.info(s"Killed topology $name") + } else { + throw new RuntimeException(s"topology $name not found") + } + } + + override def getNimbusConf: String = { + JSONValue.toJSONString(stormConf) + } + + override def getTopology(name: String): StormTopology = { + updateApps() + topologies.getOrElse(name, + throw new RuntimeException(s"topology $name not found")).topology + } + + override def getTopologyConf(name: String): String = { + updateApps() + JSONValue.toJSONString(topologies.getOrElse(name, + throw new RuntimeException(s"topology $name not found")).config) + } + + override def getUserTopology(id: String): StormTopology = getTopology(id) + + override def beginFileUpload(): String = { + val file = File.createTempFile(s"storm-jar-${UUID.randomUUID()}", ".jar") + val location = file.getAbsolutePath + val channel = Channels.newChannel(new FileOutputStream(location)) + fileCacheMap.put(location, channel) + LOG.info(s"Uploading file from client to $location") + location + } + + override def uploadChunk(location: String, chunk: ByteBuffer): Unit = { + if (!fileCacheMap.containsKey(location)) { + throw new RuntimeException(s"File for $location does not exist (or timed out)") + } else { + val channel = fileCacheMap.get(location) + channel.write(chunk) + fileCacheMap.put(location, channel) + } + } + + override def finishFileUpload(location: String): Unit = { + if (!fileCacheMap.containsKey(location)) { + throw new RuntimeException(s"File for $location does not exist (or timed out)") + } else { + val channel = fileCacheMap.get(location) + channel.close() + fileCacheMap.remove(location) + } + } + + override def getClusterInfo: ClusterSummary = { + updateApps() + val topologySummaryList = topologies.map { case (name, _) => + new TopologySummary(name, name, 0, 0, 0, 0, "") + }.toSeq + new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) + } + + override def beginFileDownload(file: String): String = { + throw new UnsupportedOperationException + } + + override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { + throw new UnsupportedOperationException + } + override def activate(name: String): Unit = { + throw new UnsupportedOperationException + } + + override def rebalance(name: String, options: RebalanceOptions): Unit = { + throw new UnsupportedOperationException + } + + override def deactivate(name: String): Unit = { + throw new UnsupportedOperationException + } + + override def getTopologyInfo(name: String): TopologyInfo = { + throw new UnsupportedOperationException + } + + override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = { + throw new UnsupportedOperationException + } + + override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions()) + + override def downloadChunk(name: String): ByteBuffer = { + throw new UnsupportedOperationException + } + + private def updateApps(): Unit = { + clientContext.listApps.appMasters.foreach { app => + val name = app.appName + if (applications.contains(name)) { + if (app.status != MasterToAppMaster.AppMasterActive) { + removeTopology(name) + } + } + } + } + + private def removeTopology(name: String): Unit = { + applications -= name + val jar = topologies(name).jar + new File(jar).delete() + topologies -= name + } +} + +case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala new file mode 100644 index 0000000..1cfd5a4 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.main + +import backtype.storm.Config +import backtype.storm.utils.Utils +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{AkkaApp, LogUtil, Util} + +object GearpumpStormClient extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array( + "jar" -> CLIOption[String]("<storm jar>", required = true), + "config" -> CLIOption[Int]("<storm config file>", required = true), + "verbose" -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false))) + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + val verbose = config.getBoolean("verbose") + if (verbose) { + LogUtil.verboseLogToConsole() + } + + val jar = config.getString("jar") + val stormConfig = config.getString("config") + val topology = config.remainArgs(0) + val stormArgs = config.remainArgs.drop(1) + val stormOptions = Array( + s"-Dstorm.options=${getThriftOptions(stormConfig)}", + s"-Dstorm.jar=$jar", + s"-Dstorm.conf.file=$stormConfig", + s"-D${PREFER_IPV4}=true" + ) + + val gearpumpHome = System.getProperty(GEARPUMP_HOME) + val classPath = Array(s"$gearpumpHome/lib/*", s"$gearpumpHome/lib/storm/*", jar) + val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) + + // Waits till the process exit + val exit = process.exitValue() + + if (exit != 0) { + throw new Exception(s"failed to submit jar, exit code $exit, " + + s"error summary: ${process.logger.error}") + } + } + + private def getThriftOptions(stormConfig: String): String = { + val config = Utils.findAndReadConfigFile(stormConfig, true) + val host = config.get(Config.NIMBUS_HOST) + val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT) + s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort" + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala new file mode 100644 index 0000000..aaa0a99 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.partitioner + +import org.apache.gearpump.Message +import org.apache.gearpump.experiments.storm.topology.GearpumpTuple +import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner} + +/** + * Partitioner bound to a target Storm component + * + * Partitioning is already done in + * [[org.apache.gearpump.experiments.storm.util.StormOutputCollector]] and + * kept in "targetPartitions" of [[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]] + * the partitioner just returns the partitions of the target + * + * In Gearpump, a message is sent from a task to all the subscribers. + * In Storm, however, a message is sent to one or more of the subscribers. + * Hence, we have to do the partitioning in + * [[org.apache.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm way is + * supported in Gearpump + * + * @param target target storm component id + */ +private[storm] class StormPartitioner(target: String) extends MulticastPartitioner { + + override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int) + : Array[Int] = { + val stormTuple = msg.msg.asInstanceOf[GearpumpTuple] + stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID)) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala new file mode 100644 index 0000000..a70ce48 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.processor + +import java.util.{Collection => JCollection, List => JList} + +import backtype.storm.task.IOutputCollector +import backtype.storm.tuple.Tuple +import org.apache.gearpump.experiments.storm.topology.TimedTuple +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.apache.gearpump.streaming.task.ReportCheckpointClock + +/** + * this is used by Storm bolt to emit messages + */ +private[storm] class StormBoltOutputCollector(collector: StormOutputCollector, + ackEnabled: Boolean = false) extends IOutputCollector { + private var reportTime = 0L + private var maxAckTime = 0L + + override def emit( + streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): JList[Integer] = { + collector.emit(streamId, tuple) + } + + override def emitDirect( + taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): Unit = { + collector.emitDirect(taskId, streamId, tuple) + } + + override def fail(tuple: Tuple): Unit = { + // application failure, throw exception such that the tuple can be replayed + // Note: do not print the tuple which will trigger NPE since its messageId is null + throw new Exception("Storm Bolt.execute failed") + } + + override def ack(tuple: Tuple): Unit = { + if (ackEnabled) { + tuple match { + case timedTuple: TimedTuple => + maxAckTime = Math.max(maxAckTime, timedTuple.timestamp) + val taskContext = collector.taskContext + val upstreamMinClock = taskContext.upstreamMinClock + if (reportTime <= upstreamMinClock && upstreamMinClock <= maxAckTime) { + reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * CHECKPOINT_INTERVAL_MILLIS + taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, reportTime) + reportTime += CHECKPOINT_INTERVAL_MILLIS + } + case _ => + // ignore other tuples + } + } + } + + override def reportError(throwable: Throwable): Unit = { + throw throwable + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala new file mode 100644 index 0000000..1d3048e --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.processor + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt +import org.apache.gearpump.experiments.storm.util._ +import org.apache.gearpump.streaming.task._ + +object StormProcessor { + private[storm] val TICK = Message("tick") +} + +/** + * this is runtime container for Storm bolt + */ +private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt, + taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + import org.apache.gearpump.experiments.storm.processor.StormProcessor._ + + def this(taskContext: TaskContext, conf: UserConfig) = { + this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system) + .asInstanceOf[GearpumpBolt], taskContext, conf) + } + + private val freqOpt = gearpumpBolt.getTickFrequency + + override def onStart(startTime: StartTime): Unit = { + gearpumpBolt.start(startTime) + freqOpt.foreach(scheduleTick) + } + + override def onNext(message: Message): Unit = { + message match { + case TICK => + freqOpt.foreach { freq => + gearpumpBolt.tick(freq) + scheduleTick(freq) + } + case _ => + gearpumpBolt.next(message) + } + } + + private def scheduleTick(freq: Int): Unit = { + taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) { + self ! TICK + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala new file mode 100644 index 0000000..5d4a6a2 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.producer + +import java.util.concurrent.TimeUnit + +import akka.actor.Actor.Receive +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout +import org.apache.gearpump.experiments.storm.util._ +import org.apache.gearpump.streaming.task._ + +import scala.concurrent.duration.Duration + +object StormProducer { + private[storm] val TIMEOUT = Message("timeout") +} + +/** + * this is runtime container for Storm spout + */ +private[storm] class StormProducer(gearpumpSpout: GearpumpSpout, + taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + import org.apache.gearpump.experiments.storm.producer.StormProducer._ + + def this(taskContext: TaskContext, conf: UserConfig) = { + this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system) + .asInstanceOf[GearpumpSpout], taskContext, conf) + } + + private val timeoutMillis = gearpumpSpout.getMessageTimeout + + override def onStart(startTime: StartTime): Unit = { + gearpumpSpout.start(startTime) + if (gearpumpSpout.ackEnabled) { + getCheckpointClock + } + timeoutMillis.foreach(scheduleTimeout) + self ! Message("start") + } + + override def onNext(msg: Message): Unit = { + msg match { + case TIMEOUT => + timeoutMillis.foreach { timeout => + gearpumpSpout.timeout(timeout) + scheduleTimeout(timeout) + } + case _ => + gearpumpSpout.next(msg) + } + self ! Message("continue") + } + + override def receiveUnManagedMessage: Receive = { + case CheckpointClock(optClock) => + optClock.foreach { clock => + gearpumpSpout.checkpoint(clock) + } + getCheckpointClock() + } + + def getCheckpointClock(): Unit = { + taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock) + } + + private def scheduleTimeout(timeout: Long): Unit = { + taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) { + self ! TIMEOUT + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala new file mode 100644 index 0000000..5794b1d --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.producer + +import java.util.{List => JList} + +import backtype.storm.spout.{ISpout, ISpoutOutputCollector} +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.experiments.storm.util.StormOutputCollector + +case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp) + +/** + * this is used by Storm Spout to emit messages + */ +private[storm] class StormSpoutOutputCollector( + collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean) + extends ISpoutOutputCollector { + + private var checkpointClock = 0L + private var pendingMessage: Option[PendingMessage] = None + private var nextPendingMessage: Option[PendingMessage] = None + + override def emit(streamId: String, values: JList[AnyRef], messageId: Object): JList[Integer] = { + val curTime = System.currentTimeMillis() + collector.setTimestamp(curTime) + val outTasks = collector.emit(streamId, values) + setPendingOrAck(messageId, curTime, curTime) + outTasks + } + + override def reportError(throwable: Throwable): Unit = { + throw throwable + } + + override def emitDirect(taskId: Int, streamId: String, values: JList[AnyRef], messageId: Object) + : Unit = { + val curTime = System.currentTimeMillis() + collector.setTimestamp(curTime) + collector.emitDirect(taskId, streamId, values) + setPendingOrAck(messageId, curTime, curTime) + } + + def ackPendingMessage(checkpointClock: TimeStamp): Unit = { + this.checkpointClock = checkpointClock + nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) => + if (messageTime <= this.checkpointClock) { + pendingMessage.foreach { case PendingMessage(id, _, _) => + spout.ack(id) + reset() + } + } + } + } + + def failPendingMessage(timeoutMillis: Long): Unit = { + pendingMessage.foreach { case PendingMessage(id, _, startTime) => + if (System.currentTimeMillis() - startTime >= timeoutMillis) { + spout.fail(id) + reset() + } + } + } + + private def reset(): Unit = { + pendingMessage = nextPendingMessage + nextPendingMessage = None + } + + private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp) + : Unit = { + if (ackEnabled) { + val newPendingMessage = PendingMessage(messageId, messageTime, startTime) + pendingMessage match { + case Some(msg) => + if (nextPendingMessage.isEmpty && msg.messageTime <= this.checkpointClock) { + nextPendingMessage = Some(newPendingMessage) + } else { + spout.ack(messageId) + } + case None => + pendingMessage = Some(newPendingMessage) + } + } else { + spout.ack(messageId) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala new file mode 100644 index 0000000..d0f2949 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.topology + +import java.io.{File, FileOutputStream, IOException} +import java.util +import java.util.jar.JarFile +import java.util.{HashMap => JHashMap, List => JList, Map => JMap} + +import akka.actor.ActorRef +import akka.pattern.ask +import backtype.storm.Config +import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology} +import backtype.storm.metric.api.IMetric +import backtype.storm.spout.{ISpout, SpoutOutputCollector} +import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} +import backtype.storm.tuple.{Fields, Tuple, TupleImpl} +import backtype.storm.utils.Utils +import clojure.lang.Atom +import org.apache.commons.io.{FileUtils, IOUtils} +import org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector +import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.experiments.storm.util.StormUtil._ +import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil} +import org.apache.gearpump.streaming.DAG +import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime} +import org.apache.gearpump.util.{Constants, LogUtil} +import org.apache.gearpump.{Message, TimeStamp} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.{Await, Future} + +/** + * subclass wraps Storm Spout and Bolt, and their lifecycles + * hides the complexity from Gearpump applications + */ +trait GearpumpStormComponent { + /** + * invoked at Task.onStart + * @param startTime task start time + */ + def start(startTime: StartTime): Unit + + /** + * invoked at Task.onNext + * @param message incoming message + */ + def next(message: Message): Unit + + /** + * invoked at Task.onStop + */ + def stop(): Unit = {} +} + +object GearpumpStormComponent { + private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormComponent]) + + object GearpumpSpout { + def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef], + spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = { + val componentCommon = spoutSpec.get_common() + val scalaMap = config.asScala.toMap // Convert to scala immutable map + val normalizedConfig = normalizeConfig(scalaMap, componentCommon) + val getTopologyContext = (dag: DAG, taskId: TaskId) => { + val stormTaskId = gearpumpTaskIdToStorm(taskId) + buildTopologyContext(dag, topology, normalizedConfig, stormTaskId) + } + val spout = Utils.getSetComponentObject(spoutSpec.get_spout_object()).asInstanceOf[ISpout] + val ackEnabled = StormUtil.ackEnabled(config) + if (ackEnabled) { + val className = spout.getClass.getName + if (!isSequentiallyReplayableSpout(className)) { + LOG.warn(s"at least once is not supported for $className") + } + } + val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => { + new StormSpoutOutputCollector( + StormOutputCollector(taskContext, topologyContext), spout, ackEnabled) + } + GearpumpSpout( + normalizedConfig, + spout, + askAppMasterForDAG, + getTopologyContext, + getOutputCollector, + ackEnabled, + taskContext) + } + + private def isSequentiallyReplayableSpout(className: String): Boolean = { + className.equals("storm.kafka.KafkaSpout") + } + } + + case class GearpumpSpout( + config: JMap[AnyRef, AnyRef], + spout: ISpout, + getDAG: ActorRef => DAG, + getTopologyContext: (DAG, TaskId) => TopologyContext, + getOutputCollector: (TaskContext, TopologyContext) => StormSpoutOutputCollector, + ackEnabled: Boolean, + taskContext: TaskContext) + extends GearpumpStormComponent { + + private var collector: StormSpoutOutputCollector = null + + override def start(startTime: StartTime): Unit = { + val dag = getDAG(taskContext.appMaster) + val topologyContext = getTopologyContext(dag, taskContext.taskId) + collector = getOutputCollector(taskContext, topologyContext) + spout.open(config, topologyContext, new SpoutOutputCollector(collector)) + } + + override def next(message: Message): Unit = { + spout.nextTuple() + } + + /** + * @return timeout in milliseconds if enabled + */ + def getMessageTimeout: Option[Long] = { + StormUtil.getBoolean(config, Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).flatMap { + timeoutEnabled => + if (timeoutEnabled) { + StormUtil.getInt(config, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).map(_ * 1000L) + } else { + None + } + } + } + + def checkpoint(clock: TimeStamp): Unit = { + collector.ackPendingMessage(clock) + } + + def timeout(timeoutMillis: Long): Unit = { + collector.failPendingMessage(timeoutMillis) + } + } + + object GearpumpBolt { + def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef], + boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = { + val configAsScalaMap = config.asScala.toMap // Convert to scala immutable map + val normalizedConfig = normalizeConfig(configAsScalaMap, boltSpec.get_common()) + val getTopologyContext = (dag: DAG, taskId: TaskId) => { + val stormTaskId = gearpumpTaskIdToStorm(taskId) + buildTopologyContext(dag, topology, normalizedConfig, stormTaskId) + } + val getGeneralTopologyContext = (dag: DAG) => { + buildGeneralTopologyContext(dag, topology, normalizedConfig) + } + val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => { + StormOutputCollector(taskContext, topologyContext) + } + val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) => { + + val values = new util.ArrayList[Object] // To be compatible with Java interface + values.add(freq.asInstanceOf[java.lang.Integer]) + new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null) + } + GearpumpBolt( + normalizedConfig, + Utils.getSetComponentObject(boltSpec.get_bolt_object()).asInstanceOf[IBolt], + askAppMasterForDAG, + getTopologyContext, + getGeneralTopologyContext, + getOutputCollector, + getTickTuple, + taskContext) + } + } + + case class GearpumpBolt( + config: JMap[AnyRef, AnyRef], + bolt: IBolt, + getDAG: ActorRef => DAG, + getTopologyContext: (DAG, TaskId) => TopologyContext, + getGeneralTopologyContext: DAG => GeneralTopologyContext, + getOutputCollector: (TaskContext, TopologyContext) => StormOutputCollector, + getTickTuple: (GeneralTopologyContext, Int) => Tuple, + taskContext: TaskContext) + extends GearpumpStormComponent { + + private var collector: StormOutputCollector = null + private var topologyContext: TopologyContext = null + private var generalTopologyContext: GeneralTopologyContext = null + private var tickTuple: Tuple = null + + override def start(startTime: StartTime): Unit = { + val dag = getDAG(taskContext.appMaster) + topologyContext = getTopologyContext(dag, taskContext.taskId) + generalTopologyContext = getGeneralTopologyContext(dag) + collector = getOutputCollector(taskContext, topologyContext) + val delegate = new StormBoltOutputCollector(collector, StormUtil.ackEnabled(config)) + bolt.prepare(config, topologyContext, new OutputCollector(delegate)) + } + + override def next(message: Message): Unit = { + val timestamp = message.timestamp + collector.setTimestamp(timestamp) + bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, + timestamp)) + } + + def getTickFrequency: Option[Int] = { + StormUtil.getInt(config, Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS) + } + + /** + * invoked at TICK message when "topology.tick.tuple.freq.secs" is configured + * @param freq tick frequency + */ + def tick(freq: Int): Unit = { + if (null == tickTuple) { + tickTuple = getTickTuple(generalTopologyContext, freq) + } + bolt.execute(tickTuple) + } + } + + /** + * normalize general config with per component configs + * "topology.transactional.id" and "topology.tick.tuple.freq.secs" + * @param stormConfig general config for all components + * @param componentCommon common component parts + */ + private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef], + componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = { + val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + config.putAll(stormConfig.asJava) + val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf()) + Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID)) + .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _)) + Option(componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) + .foreach(config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, _)) + config + } + + private def askAppMasterForDAG(appMaster: ActorRef): DAG = { + implicit val timeout = Constants.FUTURE_TIMEOUT + val dagFuture = (appMaster ? GetDAG).asInstanceOf[Future[DAG]] + Await.result(dagFuture, timeout.duration) + } + + private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _]) + : GeneralTopologyContext = { + val taskToComponent = getTaskToComponent(dag) + val componentToSortedTasks: JMap[String, JList[Integer]] = + getComponentToSortedTasks(taskToComponent) + val componentToStreamFields: JMap[String, JMap[String, Fields]] = + getComponentToStreamFields(topology) + new GeneralTopologyContext( + topology, stormConf, taskToComponent.asJava, componentToSortedTasks, + componentToStreamFields, null) + } + + private def buildTopologyContext( + dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: Integer) + : TopologyContext = { + val taskToComponent = getTaskToComponent(dag) + val componentToSortedTasks: JMap[String, JList[Integer]] = + getComponentToSortedTasks(taskToComponent) + val componentToStreamFields: JMap[String, JMap[String, Fields]] = + getComponentToStreamFields(topology) + val codeDir = mkCodeDir + val pidDir = mkPidDir + + new TopologyContext(topology, stormConf, taskToComponent.asJava, componentToSortedTasks, + componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, null, null, + new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]], + new Atom(false)) + } + + private def getComponentToStreamFields(topology: StormTopology) + : JMap[String, JMap[String, Fields]] = { + val spouts = topology.get_spouts().asScala + val bolts = topology.get_bolts().asScala + + val spoutFields = spouts.map { + case (id, component) => id -> getComponentToFields(component.get_common()) + } + + val boltFields = bolts.map { + case (id, component) => id -> getComponentToFields(component.get_common()) + } + + val systemFields = Map(SYSTEM_COMPONENT_ID -> + Map(SYSTEM_TICK_STREAM_ID -> new Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava) + + (spoutFields ++ boltFields ++ systemFields).asJava + } + + private def getComponentToFields(common: ComponentCommon): JMap[String, Fields] = { + val streams = common.get_streams.asScala + streams.map { case (sid, stream) => + sid -> new Fields(stream.get_output_fields()) + }.asJava + } + + private def getComponentToSortedTasks( + taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = { + taskToComponent.groupBy(_._2).map { case (component, map) => + val sortedTasks = map.keys.toList.sorted.asJava + component -> sortedTasks + }.asJava + } + + private def getTaskToComponent(dag: DAG): Map[Integer, String] = { + val taskToComponent = dag.processors.flatMap { case (processorId, processorDescription) => + val parallelism = processorDescription.parallelism + val component = processorDescription.taskConf.getString(STORM_COMPONENT).get + (0 until parallelism).map(index => + gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component) + } + taskToComponent + } + + // Workarounds to support storm ShellBolt + private def mkPidDir: String = { + val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid" + try { + FileUtils.forceMkdir(new File(pidDir)) + } catch { + case ex: IOException => + LOG.error(s"failed to create pid directory $pidDir") + } + pidDir + } + + // a workaround to support storm ShellBolt + private def mkCodeDir: String = { + val jarPath = System.getProperty("java.class.path").split(":").last + val destDir = FileUtils.getTempDirectoryPath + File.separator + "storm" + + try { + FileUtils.forceMkdir(new File(destDir)) + + val jar = new JarFile(jarPath) + val enumEntries = jar.entries().asScala + enumEntries.foreach { entry => + val file = new File(destDir + File.separator + entry.getName) + if (!entry.isDirectory) { + file.getParentFile.mkdirs() + + val is = jar.getInputStream(entry) + val fos = new FileOutputStream(file) + try { + IOUtils.copy(is, fos) + } catch { + case ex: IOException => + LOG.error(s"failed to copy data from ${entry.getName} to ${file.getName}") + } finally { + fos.close() + is.close() + } + } + } + } catch { + case ex: IOException => + LOG.error(s"could not extract $destDir from $jarPath") + } + + destDir + File.separator + "resources" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala new file mode 100644 index 0000000..62bc25c --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.topology + +import java.lang.{Iterable => JIterable} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} + +import akka.actor.ActorSystem +import backtype.storm.Config +import backtype.storm.generated._ +import backtype.storm.utils.{ThriftTopologyUtils, Utils} +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.processor.StormProcessor +import org.apache.gearpump.experiments.storm.producer.StormProducer +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.experiments.storm.util.StormUtil +import org.apache.gearpump.experiments.storm.util.StormUtil._ +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.task.Task +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +// TODO: Refactor this file, we should disable using of JavaConversions +// scalastyle:off javaconversions +import scala.collection.JavaConversions._ +// scalastyle:on javaconversions + +object GearpumpStormTopology { + private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology]) + + def apply( + name: String, + topology: StormTopology, + appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = { + new GearpumpStormTopology( + name, + topology, + Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]], + parseJsonStringToMap(appConfigInJson) + ) + } +} + +/** + * this is a wrapper over Storm topology which + * 1. merges Storm and Gearpump configs + * 2. creates Gearpump processors + * 3. provides interface for Gearpump applications to use Storm topology + * + * an implicit ActorSystem is required to create Gearpump processors + * @param name topology name + * @param topology Storm topology + * @param sysConfig configs from "defaults.yaml" and custom config file + * @param appConfig config submitted from user application + */ +private[storm] class GearpumpStormTopology( + name: String, + topology: StormTopology, + sysConfig: JMap[AnyRef, AnyRef], + appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) { + + private val spouts = topology.get_spouts() + private val bolts = topology.get_bolts() + private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts)) + private val spoutProcessors = spouts.map { case (id, spout) => + id -> spoutToProcessor(id, spout, stormConfig.toMap) + }.toMap + private val boltProcessors = bolts.map { case (id, bolt) => + id -> boltToProcessor(id, bolt, stormConfig.toMap) + }.toMap + private val allProcessors = spoutProcessors ++ boltProcessors + + /** + * @return merged Storm config with priority + * defaults.yaml < custom file config < application config < component config + */ + def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig + + /** + * @return Storm components to Gearpump processors + */ + def getProcessors: Map[String, Processor[Task]] = allProcessors + + /** + * @param sourceId source component id + * @return target Storm components and Gearpump processors + */ + def getTargets(sourceId: String): Map[String, Processor[Task]] = { + getTargets(sourceId, topology).map { case (targetId, _) => + targetId -> boltProcessors(targetId) + } + } + + /** + * merge configs from application, custom config file and component + */ + private def mergeConfigs( + sysConfig: JMap[AnyRef, AnyRef], + appConfig: JMap[AnyRef, AnyRef], + componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = { + val allConfig = new JHashMap[AnyRef, AnyRef] + allConfig.putAll(sysConfig) + allConfig.putAll(appConfig) + allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap)) + allConfig.put(Config.TOPOLOGY_NAME, name) + allConfig + } + + /** + * creates Gearpump processor from Storm spout + * @param spoutId spout id + * @param spoutSpec spout spec + * @param stormConfig merged storm config + * @param system actor system + * @return a Processor[StormProducer] + */ + private def spoutToProcessor(spoutId: String, spoutSpec: SpoutSpec, + stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = { + val componentCommon = spoutSpec.get_common() + val taskConf = UserConfig.empty + .withString(STORM_COMPONENT, spoutId) + val parallelism = getParallelism(stormConfig, componentCommon) + Processor[StormProducer](parallelism, spoutId, taskConf) + } + + /** + * creates Gearpump processor from Storm bolt + * @param boltId bolt id + * @param boltSpec bolt spec + * @param stormConfig merged storm config + * @param system actor system + * @return a Processor[StormProcessor] + */ + private def boltToProcessor(boltId: String, boltSpec: Bolt, + stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = { + val componentCommon = boltSpec.get_common() + val taskConf = UserConfig.empty + .withString(STORM_COMPONENT, boltId) + .withBoolean("state.checkpoint.enable", StormUtil.ackEnabled(stormConfig)) + val parallelism = getParallelism(stormConfig, componentCommon) + Processor[StormProcessor](parallelism, boltId, taskConf) + } + + /** + * @return target components and streams + */ + private def getTargets(componentId: String, topology: StormTopology) + : Map[String, Map[String, Grouping]] = { + val componentIds = ThriftTopologyUtils.getComponentIds(topology) + componentIds.flatMap { otherComponentId => + getInputs(otherComponentId, topology).toList.map(otherComponentId -> _) + }.foldLeft(Map.empty[String, Map[String, Grouping]]) { + (allTargets, componentAndInput) => + val (otherComponentId, (globalStreamId, grouping)) = componentAndInput + val inputStreamId = globalStreamId.get_streamId() + val inputComponentId = globalStreamId.get_componentId + if (inputComponentId.equals(componentId)) { + val curr = allTargets.getOrElse(otherComponentId, Map.empty[String, Grouping]) + allTargets + (otherComponentId -> (curr + (inputStreamId -> grouping))) + } else { + allTargets + } + } + } + + /** + * @return input stream and grouping for a Storm component + */ + private def getInputs(componentId: String, topology: StormTopology) + : JMap[GlobalStreamId, Grouping] = { + ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs + } + + /** + * get Storm component parallelism according to the following rule, + * 1. use "topology.tasks" if defined; otherwise use parallelism_hint + * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined + * 3. component config overrides system config + * @param stormConfig System configs without merging "topology.tasks" and + * "topology.max.task.parallelism" of component + * @return number of task instances for a component + */ + private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: ComponentCommon): Int = { + val parallelismHint: Int = if (component.is_set_parallelism_hint()) { + component.get_parallelism_hint() + } else { + 1 + } + val mergedConfig = new JHashMap[AnyRef, AnyRef] + val componentConfig = parseJsonStringToMap(component.get_json_conf) + mergedConfig.putAll(stormConfig) + mergedConfig.putAll(componentConfig) + val numTasks: Int = getInt(mergedConfig, Config.TOPOLOGY_TASKS).getOrElse(parallelismHint) + val parallelism: Int = getInt(mergedConfig, Config.TOPOLOGY_MAX_TASK_PARALLELISM) + .fold(numTasks)(p => math.min(p, numTasks)) + parallelism + } + + private def getComponentConfigs(spouts: JMap[String, SpoutSpec], + bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = { + spouts.map { case (id, spoutSpec) => + parseJsonStringToMap(spoutSpec.get_common().get_json_conf()) + } ++ bolts.map { case (id, boltSpec) => + parseJsonStringToMap(boltSpec.get_common().get_json_conf()) + } + } + + /** + * merge component configs "topology.kryo.decorators" and "topology.kryo.register" + * @param componentConfigs list of component configs + * @param allConfig existing configs without merging component configs + * @return the two configs merged from all the component configs and existing configs + */ + private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], + allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { + val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig)) + mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig)) + mergedConfig + } + + /** + * @param componentConfigs list of component configs + * @param allConfig existing configs without merging component configs + * @return a merged config with a list of distinct kryo decorators from component and + * existing configs + */ + private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], + allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { + val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1) + val key = Config.TOPOLOGY_KRYO_DECORATORS + val configs = getConfigValues(componentConfigs, allConfig, key) + val distincts = configs.foldLeft(Set.empty[String]) { + case (accum, config: JIterable[_]) => + accum ++ config.map { + case s: String => s + case illegal => + throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal") + } + case (accum, null) => + accum + case illegal => + throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal") + } + if (distincts.nonEmpty) { + val decorators: JList[String] = new JArrayList(distincts.size) + decorators.addAll(distincts) + mergedConfig.put(key, decorators) + } + mergedConfig + } + + /** + * @param componentConfigs list of component configs + * @param allConfig existing configs without merging component configs + * @return a merged config with component config overriding existing configs + */ + private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], + allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { + val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1) + val key = Config.TOPOLOGY_KRYO_REGISTER + val configs = getConfigValues(componentConfigs, allConfig, key) + val merged = configs.foldLeft(Map.empty[String, String]) { + case (accum, config: JIterable[_]) => + accum ++ config.map { + case m: JMap[_, _] => + m.map { + case (k: String, v: String) => k -> v + case illegal => + throw new IllegalArgumentException( + s"each element of $key must be a String or a Map of Strings; actually $illegal") + } + case s: String => + Map(s -> null) + case illegal => + throw new IllegalArgumentException(s"each element of $key must be a String or " + + s"a Map of Strings; actually $illegal") + }.reduce(_ ++ _) + case (accum, null) => + accum + case (accum, illegal) => + throw new IllegalArgumentException( + s"$key must be an Iterable containing only Strings or Maps of Strings; actually $illegal") + } + if (merged.nonEmpty) { + val registers: JMap[String, String] = new JHashMap[String, String](merged.size) + registers.putAll(merged) + mergedConfig.put(key, registers) + } + mergedConfig + } + + /** + * @param componentConfigs list of raw component configs + * @param allConfig existing configs without merging component configs + * @param key config key + * @return a list of values for a config from both component configs and existing configs + */ + private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], + allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = { + componentConfigs.map(config => config.get(key)) ++ allConfig.get(key).toList + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala new file mode 100644 index 0000000..eb61acb --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.topology + +import java.util.{List => JList} + +import backtype.storm.task.GeneralTopologyContext +import backtype.storm.tuple.{Tuple, TupleImpl} + +import org.apache.gearpump.TimeStamp + +/** + * this carries Storm tuple values in the Gearpump world + * the targetPartitions field dictate which tasks a GearpumpTuple should be sent to + * see [[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] for more info + */ +private[storm] class GearpumpTuple( + val values: JList[AnyRef], + val sourceTaskId: Integer, + val sourceStreamId: String, + @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable { + /** + * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component + * this is needed for each incoming message + * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization + * @param topologyContext topology context used for all tasks + * @return a Tuple + */ + def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = { + TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp) + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple] + + override def equals(other: Any): Boolean = other match { + case that: GearpumpTuple => + (that canEqual this) && + values == that.values && + sourceTaskId == that.sourceTaskId && + sourceStreamId == that.sourceStreamId + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(values, sourceTaskId, sourceStreamId) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef], + sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp) + extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null) + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala new file mode 100644 index 0000000..777acab --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/GraphBuilder.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner +import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology +import org.apache.gearpump.partitioner.Partitioner +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.task.Task +import org.apache.gearpump.util.Graph + +object GraphBuilder { + + /** + * build a Gearpump DAG from a Storm topology + * @param topology a wrapper over Storm topology + * @return a DAG + */ + def build(topology: GearpumpStormTopology): Graph[Processor[_ <: Task], _ <: Partitioner] = { + val processorGraph = Graph.empty[Processor[Task], Partitioner] + + topology.getProcessors.foreach { case (sourceId, sourceProcessor) => + topology.getTargets(sourceId).foreach { case (targetId, targetProcessor) => + processorGraph.addEdge(sourceProcessor, new StormPartitioner(targetId), targetProcessor) + } + } + + processorGraph + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala new file mode 100644 index 0000000..1d04af6 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import java.util.{List => JList} +import scala.util.Random + +import backtype.storm.generated.GlobalStreamId +import backtype.storm.grouping.CustomStreamGrouping +import backtype.storm.task.TopologyContext +import backtype.storm.tuple.Fields + +/** + * Grouper is identical to that in storm but return gearpump partitions for storm tuple values + */ +sealed trait Grouper { + /** + * @param taskId storm task id of source task + * @param values storm tuple values + * @return a list of gearpump partitions + */ + def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] +} + +/** + * GlobalGrouper always returns partition 0 + */ +class GlobalGrouper extends Grouper { + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = Array(0) +} + +/** + * NoneGrouper randomly returns partition + * + * @param numTasks number of target tasks + */ +class NoneGrouper(numTasks: Int) extends Grouper { + private val random = new Random + + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + val partition = StormUtil.mod(random.nextInt, numTasks) + Array(partition) + } +} + +/** + * ShuffleGrouper shuffles partitions and returns them sequentially, and then shuffles again + * + * @param numTasks number of target tasks + */ +class ShuffleGrouper(numTasks: Int) extends Grouper { + private val random = new Random + private var index = -1 + private var partitions = List.empty[Int] + + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + index += 1 + if (partitions.isEmpty) { + partitions = 0.until(numTasks).toList + partitions = random.shuffle(partitions) + } else if (index >= numTasks) { + index = 0 + partitions = random.shuffle(partitions) + } + Array(partitions(index)) + } +} + +/** + * FieldsGrouper returns partition based on value of groupFields + * + * @param outFields declared output fields of source task + * @param groupFields grouping fields of target tasks + * @param numTasks number of target tasks + */ +class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) extends Grouper { + + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + val hash = outFields.select(groupFields, values).hashCode() + val partition = StormUtil.mod(hash, numTasks) + Array(partition) + } +} + +/** + * AllGrouper returns all partitions + * + * @param numTasks number of target tasks + */ +class AllGrouper(numTasks: Int) extends Grouper { + val partitions = (0 until numTasks).toArray + + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + partitions + } +} + +/** + * CustomGrouper allows users to specify grouping strategy + * + * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]] + */ +class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper { + + def prepare( + topologyContext: TopologyContext, globalStreamId: GlobalStreamId, targetTasks: JList[Integer]) + : Unit = { + grouping.prepare(topologyContext, globalStreamId, targetTasks) + } + + override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { + val tasks = grouping.chooseTasks(taskId, values) + val result = new Array[Int](tasks.size()) + + val iter = tasks.iterator() + + var index = 0 + while (iter.hasNext()) { + val value = iter.next() + result(index) = StormUtil.stormTaskIdToGearpump(value).index + index += 1 + } + result + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala new file mode 100644 index 0000000..f1c736c --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormConstants.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +object StormConstants { + val STORM_COMPONENT = "storm_component" + val STORM_TOPOLOGY = "storm_topology" + val STORM_CONFIG = "storm_config" + val SYSTEM_COMPONENT_ID = "__system" + val SYSTEM_COMPONENT_OUTPUT_FIELDS = "rate_secs" + val SYSTEM_TASK_ID: Integer = -1 + val SYSTEM_TICK_STREAM_ID = "__tick" + + val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds + + val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework" +}
