http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala deleted file mode 100644 index aca2736..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.main - -import java.io.{File, FileOutputStream, FileWriter} -import java.nio.ByteBuffer -import java.nio.channels.{Channels, WritableByteChannel} -import java.util.{HashMap => JHashMap, Map => JMap, UUID} -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -import akka.actor.ActorSystem -import com.typesafe.config.ConfigValueFactory -import backtype.storm.Config -import backtype.storm.generated._ -import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} -import backtype.storm.utils.Utils -import org.apache.storm.shade.org.json.simple.JSONValue -import org.apache.storm.shade.org.yaml.snakeyaml.Yaml -import org.slf4j.Logger - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.cluster.{MasterToAppMaster, UserConfig} -import io.gearpump.experiments.storm.topology.GearpumpStormTopology -import io.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback -import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil, TimeCacheMapWrapper} -import io.gearpump.streaming.StreamApplication -import io.gearpump.util.{AkkaApp, Constants, LogUtil} - -object GearpumpNimbus extends AkkaApp with ArgumentsParser { - private val THRIFT_PORT = StormUtil.getThriftPort() - private val OUTPUT = "output" - private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) - - override val options: Array[(String, CLIOption[Any])] = Array( - OUTPUT -> CLIOption[String]("<output path for configuration file>", - required = false, defaultValue = Some("app.yaml")) - ) - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - val parsed = parse(args) - val output = parsed.getString(OUTPUT) - val akkaConf = updateClientConfig(inputAkkaConf) - val system = ActorSystem("storm", akkaConf) - - val clientContext = new ClientContext(akkaConf, system, null) - val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] - val thriftConf: JMap[AnyRef, AnyRef] = Map( - Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME), - Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]] - updateStormConfig(thriftConf, output) - stormConf.putAll(thriftConf) - - import scala.concurrent.ExecutionContext.Implicits.global - Future { - val thriftServer = createServer(clientContext, stormConf) - thriftServer.serve() - } - Await.result(system.whenTerminated, Duration.Inf) - } - - private def createServer( - clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = { - val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, - stormConf)) - val connectionType = ThriftConnectionType.NIMBUS - new ThriftServer(stormConf, processor, connectionType) - } - - private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = { - val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]] - updatedConfig.putAll(outputConfig) - updatedConfig.putAll(thriftConfig) - val yaml = new Yaml - val serialized = yaml.dumpAsMap(updatedConfig) - val writer = new FileWriter(new File(output)) - try { - writer.write(serialized) - } catch { - case e: Exception => throw e - } finally { - writer.close() - } - } - - import io.gearpump.util.Constants._ - private def updateClientConfig(config: Config): Config = { - val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" - val appClassPath = s"$storm${File.pathSeparator}" + - config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) - val executorClassPath = s"$storm${File.pathSeparator}" + - config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) - - val updated = config - .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) - .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, - ConfigValueFactory.fromAnyRef(executorClassPath)) - - if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { - val serializerConfig = ConfigValueFactory.fromAnyRef( - config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) - updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) - } else { - updated - } - } -} - -class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) - extends Nimbus.Iface { - - import io.gearpump.experiments.storm.main.GearpumpNimbus._ - - private var applications = Map.empty[String, Int] - private var topologies = Map.empty[String, TopologyData] - private val expireSeconds = StormUtil.getInt(stormConf, - Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get - private val expiredCallback = new Callback[String, WritableByteChannel] { - override def expire(k: String, v: WritableByteChannel): Unit = { - v.close() - } - } - private val fileCacheMap = new TimeCacheMapWrapper[String, WritableByteChannel](expireSeconds, - expiredCallback) - - override def submitTopology( - name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology) - : Unit = { - submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, - new SubmitOptions(TopologyInitialStatus.ACTIVE)) - } - - override def submitTopologyWithOpts( - name: String, uploadedJarLocation: String, - jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { - LOG.info(s"Submitted topology $name") - implicit val system = clientContext.system - val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) - val stormConfig = gearpumpStormTopology.getStormConfig - val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1) - val processorGraph = GraphBuilder.build(gearpumpStormTopology) - val config = UserConfig.empty - .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) - .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) - val app = StreamApplication(name, processorGraph, config) - LOG.info(s"jar file uploaded to $uploadedJarLocation") - val appId = clientContext.submit(app, uploadedJarLocation, workerNum) - applications += name -> appId - topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation) - LOG.info(s"Storm Application $appId submitted") - } - - override def killTopologyWithOpts(name: String, options: KillOptions): Unit = { - if (applications.contains(name)) { - clientContext.shutdown(applications(name)) - removeTopology(name) - LOG.info(s"Killed topology $name") - } else { - throw new RuntimeException(s"topology $name not found") - } - } - - override def getNimbusConf: String = { - JSONValue.toJSONString(stormConf) - } - - override def getTopology(name: String): StormTopology = { - updateApps() - topologies.getOrElse(name, - throw new RuntimeException(s"topology $name not found")).topology - } - - override def getTopologyConf(name: String): String = { - updateApps() - JSONValue.toJSONString(topologies.getOrElse(name, - throw new RuntimeException(s"topology $name not found")).config) - } - - override def getUserTopology(id: String): StormTopology = getTopology(id) - - override def beginFileUpload(): String = { - val file = File.createTempFile(s"storm-jar-${UUID.randomUUID()}", ".jar") - val location = file.getAbsolutePath - val channel = Channels.newChannel(new FileOutputStream(location)) - fileCacheMap.put(location, channel) - LOG.info(s"Uploading file from client to $location") - location - } - - override def uploadChunk(location: String, chunk: ByteBuffer): Unit = { - if (!fileCacheMap.containsKey(location)) { - throw new RuntimeException(s"File for $location does not exist (or timed out)") - } else { - val channel = fileCacheMap.get(location) - channel.write(chunk) - fileCacheMap.put(location, channel) - } - } - - override def finishFileUpload(location: String): Unit = { - if (!fileCacheMap.containsKey(location)) { - throw new RuntimeException(s"File for $location does not exist (or timed out)") - } else { - val channel = fileCacheMap.get(location) - channel.close() - fileCacheMap.remove(location) - } - } - - override def getClusterInfo: ClusterSummary = { - updateApps() - val topologySummaryList = topologies.map { case (name, _) => - new TopologySummary(name, name, 0, 0, 0, 0, "") - }.toSeq - new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) - } - - override def beginFileDownload(file: String): String = { - throw new UnsupportedOperationException - } - - override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { - throw new UnsupportedOperationException - } - override def activate(name: String): Unit = { - throw new UnsupportedOperationException - } - - override def rebalance(name: String, options: RebalanceOptions): Unit = { - throw new UnsupportedOperationException - } - - override def deactivate(name: String): Unit = { - throw new UnsupportedOperationException - } - - override def getTopologyInfo(name: String): TopologyInfo = { - throw new UnsupportedOperationException - } - - override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = { - throw new UnsupportedOperationException - } - - override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions()) - - override def downloadChunk(name: String): ByteBuffer = { - throw new UnsupportedOperationException - } - - private def updateApps(): Unit = { - clientContext.listApps.appMasters.foreach { app => - val name = app.appName - if (applications.contains(name)) { - if (app.status != MasterToAppMaster.AppMasterActive) { - removeTopology(name) - } - } - } - } - - private def removeTopology(name: String): Unit = { - applications -= name - val jar = topologies(name).jar - new File(jar).delete() - topologies -= name - } -} - -case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala deleted file mode 100644 index fbdd579..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.main - -import backtype.storm.Config -import backtype.storm.utils.Utils - -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.util.Constants._ -import io.gearpump.util.{AkkaApp, LogUtil, Util} - -object GearpumpStormClient extends AkkaApp with ArgumentsParser { - - override val options: Array[(String, CLIOption[Any])] = Array( - "jar" -> CLIOption[String]("<storm jar>", required = true), - "config" -> CLIOption[Int]("<storm config file>", required = true), - "verbose" -> CLIOption("<print verbose log on console>", required = false, - defaultValue = Some(false))) - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - val verbose = config.getBoolean("verbose") - if (verbose) { - LogUtil.verboseLogToConsole() - } - - val jar = config.getString("jar") - val stormConfig = config.getString("config") - val topology = config.remainArgs(0) - val stormArgs = config.remainArgs.drop(1) - val stormOptions = Array( - s"-Dstorm.options=${getThriftOptions(stormConfig)}", - s"-Dstorm.jar=$jar", - s"-Dstorm.conf.file=$stormConfig", - s"-D${PREFER_IPV4}=true" - ) - - val gearpumpHome = System.getProperty(GEARPUMP_HOME) - val classPath = Array(s"$gearpumpHome/lib/*", s"$gearpumpHome/lib/storm/*", jar) - val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) - - // Waits till the process exit - val exit = process.exitValue() - - if (exit != 0) { - throw new Exception(s"failed to submit jar, exit code $exit, " + - s"error summary: ${process.logger.error}") - } - } - - private def getThriftOptions(stormConfig: String): String = { - val config = Utils.findAndReadConfigFile(stormConfig, true) - val host = config.get(Config.NIMBUS_HOST) - val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT) - s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort" - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala deleted file mode 100644 index e47e11d..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.partitioner - -import io.gearpump.Message -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.partitioner.{MulticastPartitioner, Partitioner} - -/** - * Partitioner bound to a target Storm component - * - * Partitioning is already done in [[io.gearpump.experiments.storm.util.StormOutputCollector]] and - * kept in "targetPartitions" of [[io.gearpump.experiments.storm.topology.GearpumpTuple]] - * the partitioner just returns the partitions of the target - * - * In Gearpump, a message is sent from a task to all the subscribers. - * In Storm, however, a message is sent to one or more of the subscribers. - * Hence, we have to do the partitioning in - * [[io.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm way is supported - * in Gearpump - * - * @param target target storm component id - */ -private[storm] class StormPartitioner(target: String) extends MulticastPartitioner { - - override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int) - : Array[Int] = { - val stormTuple = msg.msg.asInstanceOf[GearpumpTuple] - stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID)) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala deleted file mode 100644 index 5920bf8..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.processor - -import java.util.{Collection => JCollection, List => JList} - -import backtype.storm.task.IOutputCollector -import backtype.storm.tuple.Tuple - -import io.gearpump.experiments.storm.topology.TimedTuple -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.experiments.storm.util.StormOutputCollector -import io.gearpump.streaming.task.ReportCheckpointClock - -/** - * this is used by Storm bolt to emit messages - */ -private[storm] class StormBoltOutputCollector(collector: StormOutputCollector, - ackEnabled: Boolean = false) extends IOutputCollector { - private var reportTime = 0L - private var maxAckTime = 0L - - override def emit( - streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): JList[Integer] = { - collector.emit(streamId, tuple) - } - - override def emitDirect( - taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): Unit = { - collector.emitDirect(taskId, streamId, tuple) - } - - override def fail(tuple: Tuple): Unit = { - // application failure, throw exception such that the tuple can be replayed - // Note: do not print the tuple which will trigger NPE since its messageId is null - throw new Exception("Storm Bolt.execute failed") - } - - override def ack(tuple: Tuple): Unit = { - if (ackEnabled) { - tuple match { - case timedTuple: TimedTuple => - maxAckTime = Math.max(maxAckTime, timedTuple.timestamp) - val taskContext = collector.taskContext - val upstreamMinClock = taskContext.upstreamMinClock - if (reportTime <= upstreamMinClock && upstreamMinClock <= maxAckTime) { - reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * CHECKPOINT_INTERVAL_MILLIS - taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, reportTime) - reportTime += CHECKPOINT_INTERVAL_MILLIS - } - case _ => - // ignore other tuples - } - } - } - - override def reportError(throwable: Throwable): Unit = { - throw throwable - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala deleted file mode 100644 index 67548b3..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.processor - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.Duration - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt -import io.gearpump.experiments.storm.util._ -import io.gearpump.streaming.task._ - -object StormProcessor { - private[storm] val TICK = Message("tick") -} - -/** - * this is runtime container for Storm bolt - */ -private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt, - taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - import io.gearpump.experiments.storm.processor.StormProcessor._ - - def this(taskContext: TaskContext, conf: UserConfig) = { - this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system) - .asInstanceOf[GearpumpBolt], taskContext, conf) - } - - private val freqOpt = gearpumpBolt.getTickFrequency - - override def onStart(startTime: StartTime): Unit = { - gearpumpBolt.start(startTime) - freqOpt.foreach(scheduleTick) - } - - override def onNext(message: Message): Unit = { - message match { - case TICK => - freqOpt.foreach { freq => - gearpumpBolt.tick(freq) - scheduleTick(freq) - } - case _ => - gearpumpBolt.next(message) - } - } - - private def scheduleTick(freq: Int): Unit = { - taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) { - self ! TICK - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala deleted file mode 100644 index 39882c4..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.producer - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.Duration - -import akka.actor.Actor.Receive - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout -import io.gearpump.experiments.storm.util._ -import io.gearpump.streaming.task._ - -object StormProducer { - private[storm] val TIMEOUT = Message("timeout") -} - -/** - * this is runtime container for Storm spout - */ -private[storm] class StormProducer(gearpumpSpout: GearpumpSpout, - taskContext: TaskContext, conf: UserConfig) - extends Task(taskContext, conf) { - import io.gearpump.experiments.storm.producer.StormProducer._ - - def this(taskContext: TaskContext, conf: UserConfig) = { - this(StormUtil.getGearpumpStormComponent(taskContext, conf)(taskContext.system) - .asInstanceOf[GearpumpSpout], taskContext, conf) - } - - private val timeoutMillis = gearpumpSpout.getMessageTimeout - - override def onStart(startTime: StartTime): Unit = { - gearpumpSpout.start(startTime) - if (gearpumpSpout.ackEnabled) { - getCheckpointClock - } - timeoutMillis.foreach(scheduleTimeout) - self ! Message("start") - } - - override def onNext(msg: Message): Unit = { - msg match { - case TIMEOUT => - timeoutMillis.foreach { timeout => - gearpumpSpout.timeout(timeout) - scheduleTimeout(timeout) - } - case _ => - gearpumpSpout.next(msg) - } - self ! Message("continue") - } - - override def receiveUnManagedMessage: Receive = { - case CheckpointClock(optClock) => - optClock.foreach { clock => - gearpumpSpout.checkpoint(clock) - } - getCheckpointClock() - } - - def getCheckpointClock(): Unit = { - taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS, - TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock) - } - - private def scheduleTimeout(timeout: Long): Unit = { - taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) { - self ! TIMEOUT - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala deleted file mode 100644 index 83532dc..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.producer - -import java.util.{List => JList} - -import backtype.storm.spout.{ISpout, ISpoutOutputCollector} - -import io.gearpump.TimeStamp -import io.gearpump.experiments.storm.util.StormOutputCollector - -case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp) - -/** - * this is used by Storm Spout to emit messages - */ -private[storm] class StormSpoutOutputCollector( - collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean) - extends ISpoutOutputCollector { - - private var checkpointClock = 0L - private var pendingMessage: Option[PendingMessage] = None - private var nextPendingMessage: Option[PendingMessage] = None - - override def emit(streamId: String, values: JList[AnyRef], messageId: Object): JList[Integer] = { - val curTime = System.currentTimeMillis() - collector.setTimestamp(curTime) - val outTasks = collector.emit(streamId, values) - setPendingOrAck(messageId, curTime, curTime) - outTasks - } - - override def reportError(throwable: Throwable): Unit = { - throw throwable - } - - override def emitDirect(taskId: Int, streamId: String, values: JList[AnyRef], messageId: Object) - : Unit = { - val curTime = System.currentTimeMillis() - collector.setTimestamp(curTime) - collector.emitDirect(taskId, streamId, values) - setPendingOrAck(messageId, curTime, curTime) - } - - def ackPendingMessage(checkpointClock: TimeStamp): Unit = { - this.checkpointClock = checkpointClock - nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) => - if (messageTime <= this.checkpointClock) { - pendingMessage.foreach { case PendingMessage(id, _, _) => - spout.ack(id) - reset() - } - } - } - } - - def failPendingMessage(timeoutMillis: Long): Unit = { - pendingMessage.foreach { case PendingMessage(id, _, startTime) => - if (System.currentTimeMillis() - startTime >= timeoutMillis) { - spout.fail(id) - reset() - } - } - } - - private def reset(): Unit = { - pendingMessage = nextPendingMessage - nextPendingMessage = None - } - - private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp) - : Unit = { - if (ackEnabled) { - val newPendingMessage = PendingMessage(messageId, messageTime, startTime) - pendingMessage match { - case Some(msg) => - if (nextPendingMessage.isEmpty && msg.messageTime <= this.checkpointClock) { - nextPendingMessage = Some(newPendingMessage) - } else { - spout.ack(messageId) - } - case None => - pendingMessage = Some(newPendingMessage) - } - } else { - spout.ack(messageId) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala deleted file mode 100644 index 42dce5f..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.topology - -import java.io.{File, FileOutputStream, IOException} -import java.util -import java.util.jar.JarFile -import java.util.{HashMap => JHashMap, List => JList, Map => JMap} -import scala.collection.JavaConverters._ -import scala.concurrent.{Await, Future} - -import akka.actor.ActorRef -import akka.pattern.ask -import backtype.storm.Config -import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology} -import backtype.storm.metric.api.IMetric -import backtype.storm.spout.{ISpout, SpoutOutputCollector} -import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} -import backtype.storm.tuple.{Fields, Tuple, TupleImpl} -import backtype.storm.utils.Utils -import clojure.lang.Atom -import org.apache.commons.io.{FileUtils, IOUtils} -import org.slf4j.Logger - -import io.gearpump.experiments.storm.processor.StormBoltOutputCollector -import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.experiments.storm.util.StormUtil._ -import io.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil} -import io.gearpump.streaming.DAG -import io.gearpump.streaming.task._ -import io.gearpump.util.{Constants, LogUtil} -import io.gearpump.{Message, TimeStamp} - -/** - * subclass wraps Storm Spout and Bolt, and their lifecycles - * hides the complexity from Gearpump applications - */ -trait GearpumpStormComponent { - /** - * invoked at Task.onStart - * @param startTime task start time - */ - def start(startTime: StartTime): Unit - - /** - * invoked at Task.onNext - * @param message incoming message - */ - def next(message: Message): Unit - - /** - * invoked at Task.onStop - */ - def stop(): Unit = {} -} - -object GearpumpStormComponent { - private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormComponent]) - - object GearpumpSpout { - def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef], - spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = { - val componentCommon = spoutSpec.get_common() - val scalaMap = config.asScala.toMap // Convert to scala immutable map - val normalizedConfig = normalizeConfig(scalaMap, componentCommon) - val getTopologyContext = (dag: DAG, taskId: TaskId) => { - val stormTaskId = gearpumpTaskIdToStorm(taskId) - buildTopologyContext(dag, topology, normalizedConfig, stormTaskId) - } - val spout = Utils.getSetComponentObject(spoutSpec.get_spout_object()).asInstanceOf[ISpout] - val ackEnabled = StormUtil.ackEnabled(config) - if (ackEnabled) { - val className = spout.getClass.getName - if (!isSequentiallyReplayableSpout(className)) { - LOG.warn(s"at least once is not supported for $className") - } - } - val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => { - new StormSpoutOutputCollector( - StormOutputCollector(taskContext, topologyContext), spout, ackEnabled) - } - GearpumpSpout( - normalizedConfig, - spout, - askAppMasterForDAG, - getTopologyContext, - getOutputCollector, - ackEnabled, - taskContext) - } - - private def isSequentiallyReplayableSpout(className: String): Boolean = { - className.equals("storm.kafka.KafkaSpout") - } - } - - case class GearpumpSpout( - config: JMap[AnyRef, AnyRef], - spout: ISpout, - getDAG: ActorRef => DAG, - getTopologyContext: (DAG, TaskId) => TopologyContext, - getOutputCollector: (TaskContext, TopologyContext) => StormSpoutOutputCollector, - ackEnabled: Boolean, - taskContext: TaskContext) - extends GearpumpStormComponent { - - private var collector: StormSpoutOutputCollector = null - - override def start(startTime: StartTime): Unit = { - import taskContext.{appMaster, taskId} - - val dag = getDAG(appMaster) - val topologyContext = getTopologyContext(dag, taskId) - collector = getOutputCollector(taskContext, topologyContext) - spout.open(config, topologyContext, new SpoutOutputCollector(collector)) - } - - override def next(message: Message): Unit = { - spout.nextTuple() - } - - /** - * @return timeout in milliseconds if enabled - */ - def getMessageTimeout: Option[Long] = { - StormUtil.getBoolean(config, Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).flatMap { - timeoutEnabled => - if (timeoutEnabled) { - StormUtil.getInt(config, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).map(_ * 1000L) - } else { - None - } - } - } - - def checkpoint(clock: TimeStamp): Unit = { - collector.ackPendingMessage(clock) - } - - def timeout(timeoutMillis: Long): Unit = { - collector.failPendingMessage(timeoutMillis) - } - } - - object GearpumpBolt { - def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef], - boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = { - val configAsScalaMap = config.asScala.toMap // Convert to scala immutable map - val normalizedConfig = normalizeConfig(configAsScalaMap, boltSpec.get_common()) - val getTopologyContext = (dag: DAG, taskId: TaskId) => { - val stormTaskId = gearpumpTaskIdToStorm(taskId) - buildTopologyContext(dag, topology, normalizedConfig, stormTaskId) - } - val getGeneralTopologyContext = (dag: DAG) => { - buildGeneralTopologyContext(dag, topology, normalizedConfig) - } - val getOutputCollector = (taskContext: TaskContext, topologyContext: TopologyContext) => { - StormOutputCollector(taskContext, topologyContext) - } - val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) => { - - val values = new util.ArrayList[Object] // To be compatible with Java interface - values.add(freq.asInstanceOf[java.lang.Integer]) - new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null) - } - GearpumpBolt( - normalizedConfig, - Utils.getSetComponentObject(boltSpec.get_bolt_object()).asInstanceOf[IBolt], - askAppMasterForDAG, - getTopologyContext, - getGeneralTopologyContext, - getOutputCollector, - getTickTuple, - taskContext) - } - } - - case class GearpumpBolt( - config: JMap[AnyRef, AnyRef], - bolt: IBolt, - getDAG: ActorRef => DAG, - getTopologyContext: (DAG, TaskId) => TopologyContext, - getGeneralTopologyContext: DAG => GeneralTopologyContext, - getOutputCollector: (TaskContext, TopologyContext) => StormOutputCollector, - getTickTuple: (GeneralTopologyContext, Int) => Tuple, - taskContext: TaskContext) - extends GearpumpStormComponent { - import taskContext.{appMaster, taskId} - - private var collector: StormOutputCollector = null - private var topologyContext: TopologyContext = null - private var generalTopologyContext: GeneralTopologyContext = null - private var tickTuple: Tuple = null - - override def start(startTime: StartTime): Unit = { - val dag = getDAG(appMaster) - topologyContext = getTopologyContext(dag, taskId) - generalTopologyContext = getGeneralTopologyContext(dag) - collector = getOutputCollector(taskContext, topologyContext) - val delegate = new StormBoltOutputCollector(collector, StormUtil.ackEnabled(config)) - bolt.prepare(config, topologyContext, new OutputCollector(delegate)) - } - - override def next(message: Message): Unit = { - val timestamp = message.timestamp - collector.setTimestamp(timestamp) - bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, - timestamp)) - } - - def getTickFrequency: Option[Int] = { - StormUtil.getInt(config, Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS) - } - - /** - * invoked at TICK message when "topology.tick.tuple.freq.secs" is configured - * @param freq tick frequency - */ - def tick(freq: Int): Unit = { - if (null == tickTuple) { - tickTuple = getTickTuple(generalTopologyContext, freq) - } - bolt.execute(tickTuple) - } - } - - /** - * normalize general config with per component configs - * "topology.transactional.id" and "topology.tick.tuple.freq.secs" - * @param stormConfig general config for all components - * @param componentCommon common component parts - */ - private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef], - componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = { - val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - config.putAll(stormConfig.asJava) - val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf()) - Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID)) - .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _)) - Option(componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) - .foreach(config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, _)) - config - } - - private def askAppMasterForDAG(appMaster: ActorRef): DAG = { - implicit val timeout = Constants.FUTURE_TIMEOUT - val dagFuture = (appMaster ? GetDAG).asInstanceOf[Future[DAG]] - Await.result(dagFuture, timeout.duration) - } - - private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, stormConf: JMap[_, _]) - : GeneralTopologyContext = { - val taskToComponent = getTaskToComponent(dag) - val componentToSortedTasks: JMap[String, JList[Integer]] = - getComponentToSortedTasks(taskToComponent) - val componentToStreamFields: JMap[String, JMap[String, Fields]] = - getComponentToStreamFields(topology) - new GeneralTopologyContext( - topology, stormConf, taskToComponent.asJava, componentToSortedTasks, - componentToStreamFields, null) - } - - private def buildTopologyContext( - dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: Integer) - : TopologyContext = { - val taskToComponent = getTaskToComponent(dag) - val componentToSortedTasks: JMap[String, JList[Integer]] = - getComponentToSortedTasks(taskToComponent) - val componentToStreamFields: JMap[String, JMap[String, Fields]] = - getComponentToStreamFields(topology) - val codeDir = mkCodeDir - val pidDir = mkPidDir - - new TopologyContext(topology, stormConf, taskToComponent.asJava, componentToSortedTasks, - componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, null, null, - new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]], - new Atom(false)) - } - - private def getComponentToStreamFields(topology: StormTopology) - : JMap[String, JMap[String, Fields]] = { - val spouts = topology.get_spouts().asScala - val bolts = topology.get_bolts().asScala - - val spoutFields = spouts.map { - case (id, component) => id -> getComponentToFields(component.get_common()) - } - - val boltFields = bolts.map { - case (id, component) => id -> getComponentToFields(component.get_common()) - } - - val systemFields = Map(SYSTEM_COMPONENT_ID -> - Map(SYSTEM_TICK_STREAM_ID -> new Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava) - - (spoutFields ++ boltFields ++ systemFields).asJava - } - - private def getComponentToFields(common: ComponentCommon): JMap[String, Fields] = { - val streams = common.get_streams.asScala - streams.map { case (sid, stream) => - sid -> new Fields(stream.get_output_fields()) - }.asJava - } - - private def getComponentToSortedTasks( - taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = { - taskToComponent.groupBy(_._2).map { case (component, map) => - val sortedTasks = map.keys.toList.sorted.asJava - component -> sortedTasks - }.asJava - } - - private def getTaskToComponent(dag: DAG): Map[Integer, String] = { - val taskToComponent = dag.processors.flatMap { case (processorId, processorDescription) => - val parallelism = processorDescription.parallelism - val component = processorDescription.taskConf.getString(STORM_COMPONENT).get - (0 until parallelism).map(index => - gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component) - } - taskToComponent - } - - // Workarounds to support storm ShellBolt - private def mkPidDir: String = { - val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid" - try { - FileUtils.forceMkdir(new File(pidDir)) - } catch { - case ex: IOException => - LOG.error(s"failed to create pid directory $pidDir") - } - pidDir - } - - // a workaround to support storm ShellBolt - private def mkCodeDir: String = { - val jarPath = System.getProperty("java.class.path").split(":").last - val destDir = FileUtils.getTempDirectoryPath + File.separator + "storm" - - try { - FileUtils.forceMkdir(new File(destDir)) - - val jar = new JarFile(jarPath) - val enumEntries = jar.entries().asScala - enumEntries.foreach { entry => - val file = new File(destDir + File.separator + entry.getName) - if (!entry.isDirectory) { - file.getParentFile.mkdirs() - - val is = jar.getInputStream(entry) - val fos = new FileOutputStream(file) - try { - IOUtils.copy(is, fos) - } catch { - case ex: IOException => - LOG.error(s"failed to copy data from ${entry.getName} to ${file.getName}") - } finally { - fos.close() - is.close() - } - } - } - } catch { - case ex: IOException => - LOG.error(s"could not extract $destDir from $jarPath") - } - - destDir + File.separator + "resources" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala deleted file mode 100644 index 5c3fc3e..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.topology - -import java.lang.{Iterable => JIterable} -import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} - -import akka.actor.ActorSystem -import backtype.storm.Config -import backtype.storm.generated._ -import backtype.storm.utils.{ThriftTopologyUtils, Utils} -import org.slf4j.Logger - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.processor.StormProcessor -import io.gearpump.experiments.storm.producer.StormProducer -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.experiments.storm.util.StormUtil -import io.gearpump.experiments.storm.util.StormUtil._ -import io.gearpump.streaming.Processor -import io.gearpump.streaming.task.Task -import io.gearpump.util.LogUtil - -// TODO: Refactor this file, we should disable using of JavaConversions -// scalastyle:off javaconversions -import scala.collection.JavaConversions._ -// scalastyle:on javaconversions - -object GearpumpStormTopology { - private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology]) - - def apply( - name: String, - topology: StormTopology, - appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = { - new GearpumpStormTopology( - name, - topology, - Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]], - parseJsonStringToMap(appConfigInJson) - ) - } -} - -/** - * this is a wrapper over Storm topology which - * 1. merges Storm and Gearpump configs - * 2. creates Gearpump processors - * 3. provides interface for Gearpump applications to use Storm topology - * - * an implicit ActorSystem is required to create Gearpump processors - * @param name topology name - * @param topology Storm topology - * @param sysConfig configs from "defaults.yaml" and custom config file - * @param appConfig config submitted from user application - */ -private[storm] class GearpumpStormTopology( - name: String, - topology: StormTopology, - sysConfig: JMap[AnyRef, AnyRef], - appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) { - - private val spouts = topology.get_spouts() - private val bolts = topology.get_bolts() - private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts)) - private val spoutProcessors = spouts.map { case (id, spout) => - id -> spoutToProcessor(id, spout, stormConfig.toMap) - }.toMap - private val boltProcessors = bolts.map { case (id, bolt) => - id -> boltToProcessor(id, bolt, stormConfig.toMap) - }.toMap - private val allProcessors = spoutProcessors ++ boltProcessors - - /** - * @return merged Storm config with priority - * defaults.yaml < custom file config < application config < component config - */ - def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig - - /** - * @return Storm components to Gearpump processors - */ - def getProcessors: Map[String, Processor[Task]] = allProcessors - - /** - * @param sourceId source component id - * @return target Storm components and Gearpump processors - */ - def getTargets(sourceId: String): Map[String, Processor[Task]] = { - getTargets(sourceId, topology).map { case (targetId, _) => - targetId -> boltProcessors(targetId) - } - } - - /** - * merge configs from application, custom config file and component - */ - private def mergeConfigs( - sysConfig: JMap[AnyRef, AnyRef], - appConfig: JMap[AnyRef, AnyRef], - componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = { - val allConfig = new JHashMap[AnyRef, AnyRef] - allConfig.putAll(sysConfig) - allConfig.putAll(appConfig) - allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap)) - allConfig.put(Config.TOPOLOGY_NAME, name) - allConfig - } - - /** - * creates Gearpump processor from Storm spout - * @param spoutId spout id - * @param spoutSpec spout spec - * @param stormConfig merged storm config - * @param system actor system - * @return a Processor[StormProducer] - */ - private def spoutToProcessor(spoutId: String, spoutSpec: SpoutSpec, - stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = { - val componentCommon = spoutSpec.get_common() - val taskConf = UserConfig.empty - .withString(STORM_COMPONENT, spoutId) - val parallelism = getParallelism(stormConfig, componentCommon) - Processor[StormProducer](parallelism, spoutId, taskConf) - } - - /** - * creates Gearpump processor from Storm bolt - * @param boltId bolt id - * @param boltSpec bolt spec - * @param stormConfig merged storm config - * @param system actor system - * @return a Processor[StormProcessor] - */ - private def boltToProcessor(boltId: String, boltSpec: Bolt, - stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): Processor[Task] = { - val componentCommon = boltSpec.get_common() - val taskConf = UserConfig.empty - .withString(STORM_COMPONENT, boltId) - .withBoolean("state.checkpoint.enable", StormUtil.ackEnabled(stormConfig)) - val parallelism = getParallelism(stormConfig, componentCommon) - Processor[StormProcessor](parallelism, boltId, taskConf) - } - - /** - * @return target components and streams - */ - private def getTargets(componentId: String, topology: StormTopology) - : Map[String, Map[String, Grouping]] = { - val componentIds = ThriftTopologyUtils.getComponentIds(topology) - componentIds.flatMap { otherComponentId => - getInputs(otherComponentId, topology).toList.map(otherComponentId -> _) - }.foldLeft(Map.empty[String, Map[String, Grouping]]) { - (allTargets, componentAndInput) => - val (otherComponentId, (globalStreamId, grouping)) = componentAndInput - val inputStreamId = globalStreamId.get_streamId() - val inputComponentId = globalStreamId.get_componentId - if (inputComponentId.equals(componentId)) { - val curr = allTargets.getOrElse(otherComponentId, Map.empty[String, Grouping]) - allTargets + (otherComponentId -> (curr + (inputStreamId -> grouping))) - } else { - allTargets - } - } - } - - /** - * @return input stream and grouping for a Storm component - */ - private def getInputs(componentId: String, topology: StormTopology) - : JMap[GlobalStreamId, Grouping] = { - ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs - } - - /** - * get Storm component parallelism according to the following rule, - * 1. use "topology.tasks" if defined; otherwise use parallelism_hint - * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined - * 3. component config overrides system config - * @param stormConfig System configs without merging "topology.tasks" and - * "topology.max.task.parallelism" of component - * @return number of task instances for a component - */ - private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: ComponentCommon): Int = { - val parallelismHint: Int = if (component.is_set_parallelism_hint()) { - component.get_parallelism_hint() - } else { - 1 - } - val mergedConfig = new JHashMap[AnyRef, AnyRef] - val componentConfig = parseJsonStringToMap(component.get_json_conf) - mergedConfig.putAll(stormConfig) - mergedConfig.putAll(componentConfig) - val numTasks: Int = getInt(mergedConfig, Config.TOPOLOGY_TASKS).getOrElse(parallelismHint) - val parallelism: Int = getInt(mergedConfig, Config.TOPOLOGY_MAX_TASK_PARALLELISM) - .fold(numTasks)(p => math.min(p, numTasks)) - parallelism - } - - private def getComponentConfigs(spouts: JMap[String, SpoutSpec], - bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = { - spouts.map { case (id, spoutSpec) => - parseJsonStringToMap(spoutSpec.get_common().get_json_conf()) - } ++ bolts.map { case (id, boltSpec) => - parseJsonStringToMap(boltSpec.get_common().get_json_conf()) - } - } - - /** - * merge component configs "topology.kryo.decorators" and "topology.kryo.register" - * @param componentConfigs list of component configs - * @param allConfig existing configs without merging component configs - * @return the two configs merged from all the component configs and existing configs - */ - private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { - val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig)) - mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig)) - mergedConfig - } - - /** - * @param componentConfigs list of component configs - * @param allConfig existing configs without merging component configs - * @return a merged config with a list of distinct kryo decorators from component and - * existing configs - */ - private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { - val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1) - val key = Config.TOPOLOGY_KRYO_DECORATORS - val configs = getConfigValues(componentConfigs, allConfig, key) - val distincts = configs.foldLeft(Set.empty[String]) { - case (accum, config: JIterable[_]) => - accum ++ config.map { - case s: String => s - case illegal => - throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal") - } - case (accum, null) => - accum - case illegal => - throw new IllegalArgumentException(s"$key must be a List of Strings; actually $illegal") - } - if (distincts.nonEmpty) { - val decorators: JList[String] = new JArrayList(distincts.size) - decorators.addAll(distincts) - mergedConfig.put(key, decorators) - } - mergedConfig - } - - /** - * @param componentConfigs list of component configs - * @param allConfig existing configs without merging component configs - * @return a merged config with component config overriding existing configs - */ - private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = { - val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1) - val key = Config.TOPOLOGY_KRYO_REGISTER - val configs = getConfigValues(componentConfigs, allConfig, key) - val merged = configs.foldLeft(Map.empty[String, String]) { - case (accum, config: JIterable[_]) => - accum ++ config.map { - case m: JMap[_, _] => - m.map { - case (k: String, v: String) => k -> v - case illegal => - throw new IllegalArgumentException( - s"each element of $key must be a String or a Map of Strings; actually $illegal") - } - case s: String => - Map(s -> null) - case illegal => - throw new IllegalArgumentException(s"each element of $key must be a String or " + - s"a Map of Strings; actually $illegal") - }.reduce(_ ++ _) - case (accum, null) => - accum - case (accum, illegal) => - throw new IllegalArgumentException( - s"$key must be an Iterable containing only Strings or Maps of Strings; actually $illegal") - } - if (merged.nonEmpty) { - val registers: JMap[String, String] = new JHashMap[String, String](merged.size) - registers.putAll(merged) - mergedConfig.put(key, registers) - } - mergedConfig - } - - /** - * @param componentConfigs list of raw component configs - * @param allConfig existing configs without merging component configs - * @param key config key - * @return a list of values for a config from both component configs and existing configs - */ - private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]], - allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = { - componentConfigs.map(config => config.get(key)) ++ allConfig.get(key).toList - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala deleted file mode 100644 index ee54add..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.topology - -import java.util.{List => JList} - -import backtype.storm.task.GeneralTopologyContext -import backtype.storm.tuple.{Tuple, TupleImpl} - -import io.gearpump.TimeStamp - -/** - * this carries Storm tuple values in the Gearpump world - * the targetPartitions field dictate which tasks a GearpumpTuple should be sent to - * see [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] for more info - */ -private[storm] class GearpumpTuple( - val values: JList[AnyRef], - val sourceTaskId: Integer, - val sourceStreamId: String, - @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable { - /** - * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component - * this is needed for each incoming message - * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization - * @param topologyContext topology context used for all tasks - * @return a Tuple - */ - def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = { - TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp) - } - - def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple] - - override def equals(other: Any): Boolean = other match { - case that: GearpumpTuple => - (that canEqual this) && - values == that.values && - sourceTaskId == that.sourceTaskId && - sourceStreamId == that.sourceStreamId - case _ => false - } - - override def hashCode(): Int = { - val state = Seq(values, sourceTaskId, sourceStreamId) - state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) - } -} - -case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef], - sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp) - extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null) - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala deleted file mode 100644 index 041a90c..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import io.gearpump.experiments.storm.partitioner.StormPartitioner -import io.gearpump.experiments.storm.topology.GearpumpStormTopology -import io.gearpump.partitioner.Partitioner -import io.gearpump.streaming.Processor -import io.gearpump.streaming.task.Task -import io.gearpump.util.Graph - -object GraphBuilder { - - /** - * build a Gearpump DAG from a Storm topology - * @param topology a wrapper over Storm topology - * @return a DAG - */ - def build(topology: GearpumpStormTopology): Graph[Processor[_ <: Task], _ <: Partitioner] = { - val processorGraph = Graph.empty[Processor[Task], Partitioner] - - topology.getProcessors.foreach { case (sourceId, sourceProcessor) => - topology.getTargets(sourceId).foreach { case (targetId, targetProcessor) => - processorGraph.addEdge(sourceProcessor, new StormPartitioner(targetId), targetProcessor) - } - } - - processorGraph - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala deleted file mode 100644 index 91277cd..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import java.util.{List => JList} -import scala.util.Random - -import backtype.storm.generated.GlobalStreamId -import backtype.storm.grouping.CustomStreamGrouping -import backtype.storm.task.TopologyContext -import backtype.storm.tuple.Fields - -/** - * Grouper is identical to that in storm but return gearpump partitions for storm tuple values - */ -sealed trait Grouper { - /** - * @param taskId storm task id of source task - * @param values storm tuple values - * @return a list of gearpump partitions - */ - def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] -} - -/** - * GlobalGrouper always returns partition 0 - */ -class GlobalGrouper extends Grouper { - override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = Array(0) -} - -/** - * NoneGrouper randomly returns partition - * - * @param numTasks number of target tasks - */ -class NoneGrouper(numTasks: Int) extends Grouper { - private val random = new Random - - override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { - val partition = StormUtil.mod(random.nextInt, numTasks) - Array(partition) - } -} - -/** - * ShuffleGrouper shuffles partitions and returns them sequentially, and then shuffles again - * - * @param numTasks number of target tasks - */ -class ShuffleGrouper(numTasks: Int) extends Grouper { - private val random = new Random - private var index = -1 - private var partitions = List.empty[Int] - - override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { - index += 1 - if (partitions.isEmpty) { - partitions = 0.until(numTasks).toList - partitions = random.shuffle(partitions) - } else if (index >= numTasks) { - index = 0 - partitions = random.shuffle(partitions) - } - Array(partitions(index)) - } -} - -/** - * FieldsGrouper returns partition based on value of groupFields - * - * @param outFields declared output fields of source task - * @param groupFields grouping fields of target tasks - * @param numTasks number of target tasks - */ -class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) extends Grouper { - - override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { - val hash = outFields.select(groupFields, values).hashCode() - val partition = StormUtil.mod(hash, numTasks) - Array(partition) - } -} - -/** - * AllGrouper returns all partitions - * - * @param numTasks number of target tasks - */ -class AllGrouper(numTasks: Int) extends Grouper { - val partitions = (0 until numTasks).toArray - - override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { - partitions - } -} - -/** - * CustomGrouper allows users to specify grouping strategy - * - * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]] - */ -class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper { - - def prepare( - topologyContext: TopologyContext, globalStreamId: GlobalStreamId, targetTasks: JList[Integer]) - : Unit = { - grouping.prepare(topologyContext, globalStreamId, targetTasks) - } - - override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = { - val tasks = grouping.chooseTasks(taskId, values) - val result = new Array[Int](tasks.size()) - - val iter = tasks.iterator() - - var index = 0 - while (iter.hasNext()) { - val value = iter.next() - result(index) = StormUtil.stormTaskIdToGearpump(value).index - index += 1 - } - result - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala deleted file mode 100644 index 928d07b..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -object StormConstants { - val STORM_COMPONENT = "storm_component" - val STORM_TOPOLOGY = "storm_topology" - val STORM_CONFIG = "storm_config" - val SYSTEM_COMPONENT_ID = "__system" - val SYSTEM_COMPONENT_OUTPUT_FIELDS = "rate_secs" - val SYSTEM_TASK_ID: Integer = -1 - val SYSTEM_TICK_STREAM_ID = "__tick" - - val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds - - val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala deleted file mode 100644 index 74d1b2b..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap} -import scala.collection.JavaConverters._ - -import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject} -import backtype.storm.grouping.CustomStreamGrouping -import backtype.storm.task.TopologyContext -import backtype.storm.tuple.Fields -import backtype.storm.utils.Utils -import org.slf4j.Logger - -import io.gearpump._ -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.experiments.storm.util.StormUtil._ -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.task.{TaskContext, TaskId} -import io.gearpump.util.LogUtil - -object StormOutputCollector { - private val LOG: Logger = LogUtil.getLogger(classOf[StormOutputCollector]) - private[storm] val EMPTY_LIST: JList[Integer] = new JArrayList[Integer](0) - - def apply(taskContext: TaskContext, topologyContext: TopologyContext): StormOutputCollector = { - val stormTaskId = topologyContext.getThisTaskId - val componentId = topologyContext.getThisComponentId - val taskToComponent = topologyContext.getTaskToComponent - val componentToProcessorId = getComponentToProcessorId(taskToComponent.asScala.toMap) - val targets = topologyContext.getTargets(componentId) - val streamGroupers: Map[String, Grouper] = - targets.asScala.flatMap { case (streamId, targetGrouping) => - targetGrouping.asScala.collect { case (target, grouping) if !grouping.is_set_direct() => - streamId -> getGrouper(topologyContext, grouping, componentId, streamId, target) - } - }.toMap - val getTargetPartitionsFn = (streamId: String, values: JList[AnyRef]) => { - getTargetPartitions(stormTaskId, streamId, targets, - streamGroupers, componentToProcessorId, values) - } - new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, - taskContext, LatestTime) - } - - /** - * get target Gearpump partitions and Storm task ids - */ - private def getTargetPartitions( - stormTaskId: Int, - streamId: String, - targets: JMap[String, JMap[String, Grouping]], - streamGroupers: Map[String, Grouper], - componentToProcessorId: Map[String, ProcessorId], - values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) = { - val ret: JList[Integer] = new JArrayList[Integer](targets.size) - - @annotation.tailrec - def getRecur(iter: JIterator[String], - accum: Map[String, Array[Int]]): Map[String, Array[Int]] = { - if (iter.hasNext) { - val target = iter.next - val grouper = streamGroupers(streamId) - val partitions = grouper.getPartitions(stormTaskId, values) - partitions.foreach { p => - val stormTaskId = gearpumpTaskIdToStorm(TaskId(componentToProcessorId(target), p)) - ret.add(stormTaskId) - } - getRecur(iter, accum + (target -> partitions)) - } else { - accum - } - } - val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, - Map.empty[String, Array[Int]]) - (targetPartitions, ret) - } - - private def getComponentToProcessorId(taskToComponent: Map[Integer, String]) - : Map[String, ProcessorId] = { - taskToComponent.map { case (id, component) => - component -> stormTaskIdToGearpump(id).processorId - } - } - - private def getGrouper(topologyContext: TopologyContext, grouping: Grouping, - source: String, streamId: String, target: String): Grouper = { - val outFields = topologyContext.getComponentOutputFields(source, streamId) - val targetTasks = topologyContext.getComponentTasks(target) - val targetTaskNum = targetTasks.size - val globalStreamId = new GlobalStreamId(source, streamId) - - grouping.getSetField match { - case Grouping._Fields.FIELDS => - if (isGlobalGrouping(grouping)) { - new GlobalGrouper - } else { - new FieldsGrouper(outFields, new Fields(grouping.get_fields()), targetTaskNum) - } - case Grouping._Fields.SHUFFLE => - new ShuffleGrouper(targetTaskNum) - case Grouping._Fields.NONE => - new NoneGrouper(targetTaskNum) - case Grouping._Fields.ALL => - new AllGrouper(targetTaskNum) - case Grouping._Fields.CUSTOM_SERIALIZED => - val customGrouping = Utils.javaDeserialize(grouping.get_custom_serialized, - classOf[Serializable]).asInstanceOf[CustomStreamGrouping] - val grouper = new CustomGrouper(customGrouping) - grouper.prepare(topologyContext, globalStreamId, targetTasks) - grouper - case Grouping._Fields.CUSTOM_OBJECT => - val customObject = grouping.get_custom_object() - val customGrouping = instantiateJavaObject(customObject) - val grouper = new CustomGrouper(customGrouping) - grouper.prepare(topologyContext, globalStreamId, targetTasks) - grouper - case Grouping._Fields.LOCAL_OR_SHUFFLE => - // Gearpump has built-in support for sending messages to local actor - new ShuffleGrouper(targetTaskNum) - case Grouping._Fields.DIRECT => - throw new Exception("direct grouping should not be called here") - } - } - - private def isGlobalGrouping(grouping: Grouping): Boolean = { - grouping.getSetField == Grouping._Fields.FIELDS && - grouping.get_fields.isEmpty - } - - private def instantiateJavaObject(javaObject: JavaObject): CustomStreamGrouping = { - val className = javaObject.get_full_class_name() - val args = javaObject.get_args_list().asScala.map(_.getFieldValue) - val customGrouping = Class.forName(className).getConstructor(args.map(_.getClass): _*) - .newInstance(args).asInstanceOf[CustomStreamGrouping] - customGrouping - } -} - -/** - * Provides common functionality for - * [[io.gearpump.experiments.storm.producer.StormSpoutOutputCollector]] - * and [[io.gearpump.experiments.storm.processor.StormBoltOutputCollector]] - */ -class StormOutputCollector( - stormTaskId: Int, - taskToComponent: JMap[Integer, String], - targets: JMap[String, JMap[String, Grouping]], - getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]), - val taskContext: TaskContext, - private var timestamp: TimeStamp) { - import io.gearpump.experiments.storm.util.StormOutputCollector._ - - /** - * Emits tuple values into a stream (invoked by a Storm output collector). - * - * wrapS the values into a message of [[GearpumpTuple]] along with the target partitions - * to tell [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] where to send the - * message. We also return the corresponding target Storm task ids back to the collector - * - * @param streamId Storm stream id - * @param values Storm tuple values - * @return Target Storm task ids - */ - def emit(streamId: String, values: JList[AnyRef]): JList[Integer] = { - if (targets.containsKey(streamId)) { - val (targetPartitions, targetStormTaskIds) = getTargetPartitionsFn(streamId, values) - val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - taskContext.output(Message(tuple, timestamp)) - targetStormTaskIds - } else { - EMPTY_LIST - } - } - - /** - * Emit tuple values to a specific Storm task (invoked by Storm output collector). - * - * We translate the Storm task id into Gearpump TaskId and tell - * [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] through the targetPartitions - * field of [[io.gearpump.experiments.storm.topology.GearpumpTuple]] - * - * @param id Storm task id - * @param streamId Storm stream id - * @param values Storm tuple values - */ - def emitDirect(id: Int, streamId: String, values: JList[AnyRef]): Unit = { - if (targets.containsKey(streamId)) { - val target = taskToComponent.get(id) - val partition = stormTaskIdToGearpump(id).index - val targetPartitions = Map(target -> Array(partition)) - val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - taskContext.output(Message(tuple, timestamp)) - } - } - - /** - * set timestamp from each incoming Message if not attached. - */ - def setTimestamp(timestamp: TimeStamp): Unit = { - this.timestamp = timestamp - } - - def getTimestamp: Long = timestamp -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala deleted file mode 100644 index f1c7bab..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.storm.util - -import java.lang.{Integer => JInteger} -import java.util.{Map => JMap} - -import akka.actor.ExtendedActorSystem -import backtype.storm.serialization.SerializationFactory -import backtype.storm.utils.ListDelegate -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.io.{Input, Output} - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.serializer.{SerializationFramework, Serializer} - -class StormSerializationFramework extends SerializationFramework { - private var stormConfig: JMap[AnyRef, AnyRef] = null - private var pool: ThreadLocal[Serializer] = null - - override def init(system: ExtendedActorSystem, config: UserConfig): Unit = { - implicit val actorSystem = system - stormConfig = config.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get - pool = new ThreadLocal[Serializer]() { - override def initialValue(): Serializer = { - val kryo = SerializationFactory.getKryo(stormConfig) - new StormSerializer(kryo) - } - } - } - - override def get(): Serializer = { - pool.get() - } -} - -/** - * serializes / deserializes [[io.gearpump.experiments.storm.topology.GearpumpTuple]] - * - * @param kryo created by Storm [[backtype.storm.serialization.SerializationFactory]] - */ -class StormSerializer(kryo: Kryo) extends Serializer { - // -1 means the max buffer size is 2147483647 - private val output = new Output(4096, -1) - private val input = new Input - - override def serialize(message: Any): Array[Byte] = { - val tuple = message.asInstanceOf[GearpumpTuple] - output.clear() - output.writeInt(tuple.sourceTaskId) - output.writeString(tuple.sourceStreamId) - val listDelegate = new ListDelegate - listDelegate.setDelegate(tuple.values) - kryo.writeObject(output, listDelegate) - output.toBytes - } - - override def deserialize(msg: Array[Byte]): Any = { - input.setBuffer(msg) - val sourceTaskId: JInteger = input.readInt - val sourceStreamId: String = input.readString - val listDelegate = kryo.readObject[ListDelegate](input, classOf[ListDelegate]) - new GearpumpTuple(listDelegate.getDelegate, sourceTaskId, sourceStreamId, null) - } -}
