http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
deleted file mode 100644
index aca2736..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.main
-
-import java.io.{File, FileOutputStream, FileWriter}
-import java.nio.ByteBuffer
-import java.nio.channels.{Channels, WritableByteChannel}
-import java.util.{HashMap => JHashMap, Map => JMap, UUID}
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import com.typesafe.config.ConfigValueFactory
-import backtype.storm.Config
-import backtype.storm.generated._
-import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
-import backtype.storm.utils.Utils
-import org.apache.storm.shade.org.json.simple.JSONValue
-import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.slf4j.Logger
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
-import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback
-import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, 
StormUtil, TimeCacheMapWrapper}
-import io.gearpump.streaming.StreamApplication
-import io.gearpump.util.{AkkaApp, Constants, LogUtil}
-
-object GearpumpNimbus extends AkkaApp with ArgumentsParser {
-  private val THRIFT_PORT = StormUtil.getThriftPort()
-  private val OUTPUT = "output"
-  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    OUTPUT -> CLIOption[String]("<output path for configuration file>",
-      required = false, defaultValue = Some("app.yaml"))
-  )
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-    val parsed = parse(args)
-    val output = parsed.getString(OUTPUT)
-    val akkaConf = updateClientConfig(inputAkkaConf)
-    val system = ActorSystem("storm", akkaConf)
-
-    val clientContext = new ClientContext(akkaConf, system, null)
-    val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
-    val thriftConf: JMap[AnyRef, AnyRef] = Map(
-      Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
-      Config.NIMBUS_THRIFT_PORT -> 
s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]]
-    updateStormConfig(thriftConf, output)
-    stormConf.putAll(thriftConf)
-
-    import scala.concurrent.ExecutionContext.Implicits.global
-    Future {
-      val thriftServer = createServer(clientContext, stormConf)
-      thriftServer.serve()
-    }
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  private def createServer(
-      clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): 
ThriftServer = {
-    val processor = new Nimbus.Processor[GearpumpNimbus](new 
GearpumpNimbus(clientContext,
-      stormConf))
-    val connectionType = ThriftConnectionType.NIMBUS
-    new ThriftServer(stormConf, processor, connectionType)
-  }
-
-  private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: 
String): Unit = {
-    val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    val outputConfig = Utils.findAndReadConfigFile(output, 
false).asInstanceOf[JMap[AnyRef, AnyRef]]
-    updatedConfig.putAll(outputConfig)
-    updatedConfig.putAll(thriftConfig)
-    val yaml = new Yaml
-    val serialized = yaml.dumpAsMap(updatedConfig)
-    val writer = new FileWriter(new File(output))
-    try {
-      writer.write(serialized)
-    } catch {
-      case e: Exception => throw e
-    } finally {
-      writer.close()
-    }
-  }
-
-  import io.gearpump.util.Constants._
-  private def updateClientConfig(config: Config): Config = {
-    val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
-    val appClassPath = s"$storm${File.pathSeparator}" +
-      config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
-    val executorClassPath = s"$storm${File.pathSeparator}" +
-      config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
-
-    val updated = config
-      .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(appClassPath))
-      .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH,
-        ConfigValueFactory.fromAnyRef(executorClassPath))
-
-    if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
-      val serializerConfig = ConfigValueFactory.fromAnyRef(
-        config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
-      updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
-    } else {
-      updated
-    }
-  }
-}
-
-class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, 
AnyRef])
-  extends Nimbus.Iface {
-
-  import io.gearpump.experiments.storm.main.GearpumpNimbus._
-
-  private var applications = Map.empty[String, Int]
-  private var topologies = Map.empty[String, TopologyData]
-  private val expireSeconds = StormUtil.getInt(stormConf,
-    Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
-  private val expiredCallback = new Callback[String, WritableByteChannel] {
-    override def expire(k: String, v: WritableByteChannel): Unit = {
-      v.close()
-    }
-  }
-  private val fileCacheMap = new TimeCacheMapWrapper[String, 
WritableByteChannel](expireSeconds,
-    expiredCallback)
-
-  override def submitTopology(
-      name: String, uploadedJarLocation: String, jsonConf: String, topology: 
StormTopology)
-    : Unit = {
-    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
-      new SubmitOptions(TopologyInitialStatus.ACTIVE))
-  }
-
-  override def submitTopologyWithOpts(
-      name: String, uploadedJarLocation: String,
-      jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit 
= {
-    LOG.info(s"Submitted topology $name")
-    implicit val system = clientContext.system
-    val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
-    val stormConfig = gearpumpStormTopology.getStormConfig
-    val workerNum = StormUtil.getInt(stormConfig, 
Config.TOPOLOGY_WORKERS).getOrElse(1)
-    val processorGraph = GraphBuilder.build(gearpumpStormTopology)
-    val config = UserConfig.empty
-      .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
-      .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, 
stormConfig)
-    val app = StreamApplication(name, processorGraph, config)
-    LOG.info(s"jar file uploaded to $uploadedJarLocation")
-    val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
-    applications += name -> appId
-    topologies += name -> TopologyData(topology, stormConfig, 
uploadedJarLocation)
-    LOG.info(s"Storm Application $appId submitted")
-  }
-
-  override def killTopologyWithOpts(name: String, options: KillOptions): Unit 
= {
-    if (applications.contains(name)) {
-      clientContext.shutdown(applications(name))
-      removeTopology(name)
-      LOG.info(s"Killed topology $name")
-    } else {
-      throw new RuntimeException(s"topology $name not found")
-    }
-  }
-
-  override def getNimbusConf: String = {
-    JSONValue.toJSONString(stormConf)
-  }
-
-  override def getTopology(name: String): StormTopology = {
-    updateApps()
-    topologies.getOrElse(name,
-      throw new RuntimeException(s"topology $name not found")).topology
-  }
-
-  override def getTopologyConf(name: String): String = {
-    updateApps()
-    JSONValue.toJSONString(topologies.getOrElse(name,
-      throw new RuntimeException(s"topology $name not found")).config)
-  }
-
-  override def getUserTopology(id: String): StormTopology = getTopology(id)
-
-  override def beginFileUpload(): String = {
-    val file = File.createTempFile(s"storm-jar-${UUID.randomUUID()}", ".jar")
-    val location = file.getAbsolutePath
-    val channel = Channels.newChannel(new FileOutputStream(location))
-    fileCacheMap.put(location, channel)
-    LOG.info(s"Uploading file from client to $location")
-    location
-  }
-
-  override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
-    if (!fileCacheMap.containsKey(location)) {
-      throw new RuntimeException(s"File for $location does not exist (or timed 
out)")
-    } else {
-      val channel = fileCacheMap.get(location)
-      channel.write(chunk)
-      fileCacheMap.put(location, channel)
-    }
-  }
-
-  override def finishFileUpload(location: String): Unit = {
-    if (!fileCacheMap.containsKey(location)) {
-      throw new RuntimeException(s"File for $location does not exist (or timed 
out)")
-    } else {
-      val channel = fileCacheMap.get(location)
-      channel.close()
-      fileCacheMap.remove(location)
-    }
-  }
-
-  override def getClusterInfo: ClusterSummary = {
-    updateApps()
-    val topologySummaryList = topologies.map { case (name, _) =>
-      new TopologySummary(name, name, 0, 0, 0, 0, "")
-    }.toSeq
-    new ClusterSummary(List[SupervisorSummary]().asJava, 0, 
topologySummaryList.asJava)
-  }
-
-  override def beginFileDownload(file: String): String = {
-    throw new UnsupportedOperationException
-  }
-
-  override def uploadNewCredentials(s: String, credentials: Credentials): Unit 
= {
-    throw new UnsupportedOperationException
-  }
-  override def activate(name: String): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def rebalance(name: String, options: RebalanceOptions): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def deactivate(name: String): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def getTopologyInfo(name: String): TopologyInfo = {
-    throw new UnsupportedOperationException
-  }
-
-  override def getTopologyInfoWithOpts(s: String, getInfoOptions: 
GetInfoOptions): TopologyInfo = {
-    throw new UnsupportedOperationException
-  }
-
-  override def killTopology(name: String): Unit = killTopologyWithOpts(name, 
new KillOptions())
-
-  override def downloadChunk(name: String): ByteBuffer = {
-    throw new UnsupportedOperationException
-  }
-
-  private def updateApps(): Unit = {
-    clientContext.listApps.appMasters.foreach { app =>
-      val name = app.appName
-      if (applications.contains(name)) {
-        if (app.status != MasterToAppMaster.AppMasterActive) {
-          removeTopology(name)
-        }
-      }
-    }
-  }
-
-  private def removeTopology(name: String): Unit = {
-    applications -= name
-    val jar = topologies(name).jar
-    new File(jar).delete()
-    topologies -= name
-  }
-}
-
-case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], 
jar: String)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
deleted file mode 100644
index fbdd579..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.main
-
-import backtype.storm.Config
-import backtype.storm.utils.Utils
-
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.util.Constants._
-import io.gearpump.util.{AkkaApp, LogUtil, Util}
-
-object GearpumpStormClient extends AkkaApp with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "jar" -> CLIOption[String]("<storm jar>", required = true),
-    "config" -> CLIOption[Int]("<storm config file>", required = true),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false,
-      defaultValue = Some(false)))
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    val verbose = config.getBoolean("verbose")
-    if (verbose) {
-      LogUtil.verboseLogToConsole()
-    }
-
-    val jar = config.getString("jar")
-    val stormConfig = config.getString("config")
-    val topology = config.remainArgs(0)
-    val stormArgs = config.remainArgs.drop(1)
-    val stormOptions = Array(
-      s"-Dstorm.options=${getThriftOptions(stormConfig)}",
-      s"-Dstorm.jar=$jar",
-      s"-Dstorm.conf.file=$stormConfig",
-      s"-D${PREFER_IPV4}=true"
-    )
-
-    val gearpumpHome = System.getProperty(GEARPUMP_HOME)
-    val classPath = Array(s"$gearpumpHome/lib/*", 
s"$gearpumpHome/lib/storm/*", jar)
-    val process = Util.startProcess(stormOptions, classPath, topology, 
stormArgs)
-
-    // Waits till the process exit
-    val exit = process.exitValue()
-
-    if (exit != 0) {
-      throw new Exception(s"failed to submit jar, exit code $exit, " +
-        s"error summary: ${process.logger.error}")
-    }
-  }
-
-  private def getThriftOptions(stormConfig: String): String = {
-    val config = Utils.findAndReadConfigFile(stormConfig, true)
-    val host = config.get(Config.NIMBUS_HOST)
-    val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT)
-    s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
deleted file mode 100644
index e47e11d..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.partitioner
-
-import io.gearpump.Message
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.partitioner.{MulticastPartitioner, Partitioner}
-
-/**
- * Partitioner bound to a target Storm component
- *
- * Partitioning is already done in 
[[io.gearpump.experiments.storm.util.StormOutputCollector]] and
- * kept in "targetPartitions" of 
[[io.gearpump.experiments.storm.topology.GearpumpTuple]]
- * the partitioner just returns the partitions of the target
- *
- * In Gearpump, a message is sent from a task to all the subscribers.
- * In Storm, however, a message is sent to one or more of the subscribers.
- * Hence, we have to do the partitioning in
- * [[io.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm 
way is supported
- * in Gearpump
- *
- * @param target target storm component id
- */
-private[storm] class StormPartitioner(target: String) extends 
MulticastPartitioner {
-
-  override def getPartitions(msg: Message, partitionNum: Int, 
currentPartitionId: Int)
-    : Array[Int] = {
-    val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
-    stormTuple.targetPartitions.getOrElse(target, 
Array(Partitioner.UNKNOWN_PARTITION_ID))
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
deleted file mode 100644
index 5920bf8..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.processor
-
-import java.util.{Collection => JCollection, List => JList}
-
-import backtype.storm.task.IOutputCollector
-import backtype.storm.tuple.Tuple
-
-import io.gearpump.experiments.storm.topology.TimedTuple
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.experiments.storm.util.StormOutputCollector
-import io.gearpump.streaming.task.ReportCheckpointClock
-
-/**
- * this is used by Storm bolt to emit messages
- */
-private[storm] class StormBoltOutputCollector(collector: StormOutputCollector,
-    ackEnabled: Boolean = false) extends IOutputCollector {
-  private var reportTime = 0L
-  private var maxAckTime = 0L
-
-  override def emit(
-      streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): 
JList[Integer] = {
-    collector.emit(streamId, tuple)
-  }
-
-  override def emitDirect(
-      taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: 
JList[AnyRef]): Unit = {
-    collector.emitDirect(taskId, streamId, tuple)
-  }
-
-  override def fail(tuple: Tuple): Unit = {
-    // application failure, throw exception such that the tuple can be replayed
-    // Note: do not print the tuple which will trigger NPE since its messageId 
is null
-    throw new Exception("Storm Bolt.execute failed")
-  }
-
-  override def ack(tuple: Tuple): Unit = {
-    if (ackEnabled) {
-      tuple match {
-        case timedTuple: TimedTuple =>
-          maxAckTime = Math.max(maxAckTime, timedTuple.timestamp)
-          val taskContext = collector.taskContext
-          val upstreamMinClock = taskContext.upstreamMinClock
-          if (reportTime <= upstreamMinClock && upstreamMinClock <= 
maxAckTime) {
-            reportTime = upstreamMinClock / CHECKPOINT_INTERVAL_MILLIS * 
CHECKPOINT_INTERVAL_MILLIS
-            taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, 
reportTime)
-            reportTime += CHECKPOINT_INTERVAL_MILLIS
-          }
-        case _ =>
-        // ignore other tuples
-      }
-    }
-  }
-
-  override def reportError(throwable: Throwable): Unit = {
-    throw throwable
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
deleted file mode 100644
index 67548b3..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.processor
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.Duration
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
-import io.gearpump.experiments.storm.util._
-import io.gearpump.streaming.task._
-
-object StormProcessor {
-  private[storm] val TICK = Message("tick")
-}
-
-/**
- * this is runtime container for Storm bolt
- */
-private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
-    taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-  import io.gearpump.experiments.storm.processor.StormProcessor._
-
-  def this(taskContext: TaskContext, conf: UserConfig) = {
-    this(StormUtil.getGearpumpStormComponent(taskContext, 
conf)(taskContext.system)
-      .asInstanceOf[GearpumpBolt], taskContext, conf)
-  }
-
-  private val freqOpt = gearpumpBolt.getTickFrequency
-
-  override def onStart(startTime: StartTime): Unit = {
-    gearpumpBolt.start(startTime)
-    freqOpt.foreach(scheduleTick)
-  }
-
-  override def onNext(message: Message): Unit = {
-    message match {
-      case TICK =>
-        freqOpt.foreach { freq =>
-          gearpumpBolt.tick(freq)
-          scheduleTick(freq)
-        }
-      case _ =>
-        gearpumpBolt.next(message)
-    }
-  }
-
-  private def scheduleTick(freq: Int): Unit = {
-    taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) {
-      self ! TICK
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
deleted file mode 100644
index 39882c4..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.producer
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.Duration
-
-import akka.actor.Actor.Receive
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
-import io.gearpump.experiments.storm.util._
-import io.gearpump.streaming.task._
-
-object StormProducer {
-  private[storm] val TIMEOUT = Message("timeout")
-}
-
-/**
- * this is runtime container for Storm spout
- */
-private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
-    taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-  import io.gearpump.experiments.storm.producer.StormProducer._
-
-  def this(taskContext: TaskContext, conf: UserConfig) = {
-    this(StormUtil.getGearpumpStormComponent(taskContext, 
conf)(taskContext.system)
-      .asInstanceOf[GearpumpSpout], taskContext, conf)
-  }
-
-  private val timeoutMillis = gearpumpSpout.getMessageTimeout
-
-  override def onStart(startTime: StartTime): Unit = {
-    gearpumpSpout.start(startTime)
-    if (gearpumpSpout.ackEnabled) {
-      getCheckpointClock
-    }
-    timeoutMillis.foreach(scheduleTimeout)
-    self ! Message("start")
-  }
-
-  override def onNext(msg: Message): Unit = {
-    msg match {
-      case TIMEOUT =>
-        timeoutMillis.foreach { timeout =>
-          gearpumpSpout.timeout(timeout)
-          scheduleTimeout(timeout)
-        }
-      case _ =>
-        gearpumpSpout.next(msg)
-    }
-    self ! Message("continue")
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case CheckpointClock(optClock) =>
-      optClock.foreach { clock =>
-        gearpumpSpout.checkpoint(clock)
-      }
-      getCheckpointClock()
-  }
-
-  def getCheckpointClock(): Unit = {
-    
taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS,
-      TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock)
-  }
-
-  private def scheduleTimeout(timeout: Long): Unit = {
-    taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) {
-      self ! TIMEOUT
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
deleted file mode 100644
index 83532dc..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.producer
-
-import java.util.{List => JList}
-
-import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
-
-import io.gearpump.TimeStamp
-import io.gearpump.experiments.storm.util.StormOutputCollector
-
-case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: 
TimeStamp)
-
-/**
- * this is used by Storm Spout to emit messages
- */
-private[storm] class StormSpoutOutputCollector(
-    collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean)
-  extends ISpoutOutputCollector {
-
-  private var checkpointClock = 0L
-  private var pendingMessage: Option[PendingMessage] = None
-  private var nextPendingMessage: Option[PendingMessage] = None
-
-  override def emit(streamId: String, values: JList[AnyRef], messageId: 
Object): JList[Integer] = {
-    val curTime = System.currentTimeMillis()
-    collector.setTimestamp(curTime)
-    val outTasks = collector.emit(streamId, values)
-    setPendingOrAck(messageId, curTime, curTime)
-    outTasks
-  }
-
-  override def reportError(throwable: Throwable): Unit = {
-    throw throwable
-  }
-
-  override def emitDirect(taskId: Int, streamId: String, values: 
JList[AnyRef], messageId: Object)
-    : Unit = {
-    val curTime = System.currentTimeMillis()
-    collector.setTimestamp(curTime)
-    collector.emitDirect(taskId, streamId, values)
-    setPendingOrAck(messageId, curTime, curTime)
-  }
-
-  def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
-    this.checkpointClock = checkpointClock
-    nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
-      if (messageTime <= this.checkpointClock) {
-        pendingMessage.foreach { case PendingMessage(id, _, _) =>
-          spout.ack(id)
-          reset()
-        }
-      }
-    }
-  }
-
-  def failPendingMessage(timeoutMillis: Long): Unit = {
-    pendingMessage.foreach { case PendingMessage(id, _, startTime) =>
-      if (System.currentTimeMillis() - startTime >= timeoutMillis) {
-        spout.fail(id)
-        reset()
-      }
-    }
-  }
-
-  private def reset(): Unit = {
-    pendingMessage = nextPendingMessage
-    nextPendingMessage = None
-  }
-
-  private def setPendingOrAck(messageId: Object, startTime: TimeStamp, 
messageTime: TimeStamp)
-    : Unit = {
-    if (ackEnabled) {
-      val newPendingMessage = PendingMessage(messageId, messageTime, startTime)
-      pendingMessage match {
-        case Some(msg) =>
-          if (nextPendingMessage.isEmpty && msg.messageTime <= 
this.checkpointClock) {
-            nextPendingMessage = Some(newPendingMessage)
-          } else {
-            spout.ack(messageId)
-          }
-        case None =>
-          pendingMessage = Some(newPendingMessage)
-      }
-    } else {
-      spout.ack(messageId)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
deleted file mode 100644
index 42dce5f..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.topology
-
-import java.io.{File, FileOutputStream, IOException}
-import java.util
-import java.util.jar.JarFile
-import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
-import scala.collection.JavaConverters._
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorRef
-import akka.pattern.ask
-import backtype.storm.Config
-import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, 
StormTopology}
-import backtype.storm.metric.api.IMetric
-import backtype.storm.spout.{ISpout, SpoutOutputCollector}
-import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, 
TopologyContext}
-import backtype.storm.tuple.{Fields, Tuple, TupleImpl}
-import backtype.storm.utils.Utils
-import clojure.lang.Atom
-import org.apache.commons.io.{FileUtils, IOUtils}
-import org.slf4j.Logger
-
-import io.gearpump.experiments.storm.processor.StormBoltOutputCollector
-import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.experiments.storm.util.StormUtil._
-import io.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil}
-import io.gearpump.streaming.DAG
-import io.gearpump.streaming.task._
-import io.gearpump.util.{Constants, LogUtil}
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * subclass wraps Storm Spout and Bolt, and their lifecycles
- * hides the complexity from Gearpump applications
- */
-trait GearpumpStormComponent {
-  /**
-   * invoked at Task.onStart
-   * @param startTime task start time
-   */
-  def start(startTime: StartTime): Unit
-
-  /**
-   * invoked at Task.onNext
-   * @param message incoming message
-   */
-  def next(message: Message): Unit
-
-  /**
-   * invoked at Task.onStop
-   */
-  def stop(): Unit = {}
-}
-
-object GearpumpStormComponent {
-  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormComponent])
-
-  object GearpumpSpout {
-    def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
-        spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = {
-      val componentCommon = spoutSpec.get_common()
-      val scalaMap = config.asScala.toMap // Convert to scala immutable map
-      val normalizedConfig = normalizeConfig(scalaMap, componentCommon)
-      val getTopologyContext = (dag: DAG, taskId: TaskId) => {
-        val stormTaskId = gearpumpTaskIdToStorm(taskId)
-        buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
-      }
-      val spout = 
Utils.getSetComponentObject(spoutSpec.get_spout_object()).asInstanceOf[ISpout]
-      val ackEnabled = StormUtil.ackEnabled(config)
-      if (ackEnabled) {
-        val className = spout.getClass.getName
-        if (!isSequentiallyReplayableSpout(className)) {
-          LOG.warn(s"at least once is not supported for $className")
-        }
-      }
-      val getOutputCollector = (taskContext: TaskContext, topologyContext: 
TopologyContext) => {
-        new StormSpoutOutputCollector(
-          StormOutputCollector(taskContext, topologyContext), spout, 
ackEnabled)
-      }
-      GearpumpSpout(
-        normalizedConfig,
-        spout,
-        askAppMasterForDAG,
-        getTopologyContext,
-        getOutputCollector,
-        ackEnabled,
-        taskContext)
-    }
-
-    private def isSequentiallyReplayableSpout(className: String): Boolean = {
-      className.equals("storm.kafka.KafkaSpout")
-    }
-  }
-
-  case class GearpumpSpout(
-      config: JMap[AnyRef, AnyRef],
-      spout: ISpout,
-      getDAG: ActorRef => DAG,
-      getTopologyContext: (DAG, TaskId) => TopologyContext,
-      getOutputCollector: (TaskContext, TopologyContext) => 
StormSpoutOutputCollector,
-      ackEnabled: Boolean,
-      taskContext: TaskContext)
-    extends GearpumpStormComponent {
-
-    private var collector: StormSpoutOutputCollector = null
-
-    override def start(startTime: StartTime): Unit = {
-      import taskContext.{appMaster, taskId}
-
-      val dag = getDAG(appMaster)
-      val topologyContext = getTopologyContext(dag, taskId)
-      collector = getOutputCollector(taskContext, topologyContext)
-      spout.open(config, topologyContext, new SpoutOutputCollector(collector))
-    }
-
-    override def next(message: Message): Unit = {
-      spout.nextTuple()
-    }
-
-    /**
-     * @return timeout in milliseconds if enabled
-     */
-    def getMessageTimeout: Option[Long] = {
-      StormUtil.getBoolean(config, 
Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).flatMap {
-        timeoutEnabled =>
-          if (timeoutEnabled) {
-            StormUtil.getInt(config, 
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).map(_ * 1000L)
-          } else {
-            None
-          }
-      }
-    }
-
-    def checkpoint(clock: TimeStamp): Unit = {
-      collector.ackPendingMessage(clock)
-    }
-
-    def timeout(timeoutMillis: Long): Unit = {
-      collector.failPendingMessage(timeoutMillis)
-    }
-  }
-
-  object GearpumpBolt {
-    def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
-        boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = {
-      val configAsScalaMap = config.asScala.toMap // Convert to scala 
immutable map
-      val normalizedConfig = normalizeConfig(configAsScalaMap, 
boltSpec.get_common())
-      val getTopologyContext = (dag: DAG, taskId: TaskId) => {
-        val stormTaskId = gearpumpTaskIdToStorm(taskId)
-        buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
-      }
-      val getGeneralTopologyContext = (dag: DAG) => {
-        buildGeneralTopologyContext(dag, topology, normalizedConfig)
-      }
-      val getOutputCollector = (taskContext: TaskContext, topologyContext: 
TopologyContext) => {
-        StormOutputCollector(taskContext, topologyContext)
-      }
-      val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) 
=> {
-
-        val values = new util.ArrayList[Object] // To be compatible with Java 
interface
-        values.add(freq.asInstanceOf[java.lang.Integer])
-        new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, 
SYSTEM_TICK_STREAM_ID, null)
-      }
-      GearpumpBolt(
-        normalizedConfig,
-        
Utils.getSetComponentObject(boltSpec.get_bolt_object()).asInstanceOf[IBolt],
-        askAppMasterForDAG,
-        getTopologyContext,
-        getGeneralTopologyContext,
-        getOutputCollector,
-        getTickTuple,
-        taskContext)
-    }
-  }
-
-  case class GearpumpBolt(
-      config: JMap[AnyRef, AnyRef],
-      bolt: IBolt,
-      getDAG: ActorRef => DAG,
-      getTopologyContext: (DAG, TaskId) => TopologyContext,
-      getGeneralTopologyContext: DAG => GeneralTopologyContext,
-      getOutputCollector: (TaskContext, TopologyContext) => 
StormOutputCollector,
-      getTickTuple: (GeneralTopologyContext, Int) => Tuple,
-      taskContext: TaskContext)
-    extends GearpumpStormComponent {
-    import taskContext.{appMaster, taskId}
-
-    private var collector: StormOutputCollector = null
-    private var topologyContext: TopologyContext = null
-    private var generalTopologyContext: GeneralTopologyContext = null
-    private var tickTuple: Tuple = null
-
-    override def start(startTime: StartTime): Unit = {
-      val dag = getDAG(appMaster)
-      topologyContext = getTopologyContext(dag, taskId)
-      generalTopologyContext = getGeneralTopologyContext(dag)
-      collector = getOutputCollector(taskContext, topologyContext)
-      val delegate = new StormBoltOutputCollector(collector, 
StormUtil.ackEnabled(config))
-      bolt.prepare(config, topologyContext, new OutputCollector(delegate))
-    }
-
-    override def next(message: Message): Unit = {
-      val timestamp = message.timestamp
-      collector.setTimestamp(timestamp)
-      
bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
-        timestamp))
-    }
-
-    def getTickFrequency: Option[Int] = {
-      StormUtil.getInt(config, Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)
-    }
-
-    /**
-     * invoked at TICK message when "topology.tick.tuple.freq.secs" is 
configured
-     * @param freq tick frequency
-     */
-    def tick(freq: Int): Unit = {
-      if (null == tickTuple) {
-        tickTuple = getTickTuple(generalTopologyContext, freq)
-      }
-      bolt.execute(tickTuple)
-    }
-  }
-
-  /**
-   * normalize general config with per component configs
-   * "topology.transactional.id" and "topology.tick.tuple.freq.secs"
-   * @param stormConfig general config for all components
-   * @param componentCommon common component parts
-   */
-  private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef],
-      componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = {
-    val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    config.putAll(stormConfig.asJava)
-    val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf())
-    Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID))
-      .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _))
-    Option(componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS))
-      .foreach(config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, _))
-    config
-  }
-
-  private def askAppMasterForDAG(appMaster: ActorRef): DAG = {
-    implicit val timeout = Constants.FUTURE_TIMEOUT
-    val dagFuture = (appMaster ? GetDAG).asInstanceOf[Future[DAG]]
-    Await.result(dagFuture, timeout.duration)
-  }
-
-  private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, 
stormConf: JMap[_, _])
-    : GeneralTopologyContext = {
-    val taskToComponent = getTaskToComponent(dag)
-    val componentToSortedTasks: JMap[String, JList[Integer]] =
-      getComponentToSortedTasks(taskToComponent)
-    val componentToStreamFields: JMap[String, JMap[String, Fields]] =
-      getComponentToStreamFields(topology)
-    new GeneralTopologyContext(
-      topology, stormConf, taskToComponent.asJava, componentToSortedTasks,
-      componentToStreamFields, null)
-  }
-
-  private def buildTopologyContext(
-      dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: 
Integer)
-    : TopologyContext = {
-    val taskToComponent = getTaskToComponent(dag)
-    val componentToSortedTasks: JMap[String, JList[Integer]] =
-      getComponentToSortedTasks(taskToComponent)
-    val componentToStreamFields: JMap[String, JMap[String, Fields]] =
-      getComponentToStreamFields(topology)
-    val codeDir = mkCodeDir
-    val pidDir = mkPidDir
-
-    new TopologyContext(topology, stormConf, taskToComponent.asJava, 
componentToSortedTasks,
-      componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, 
null, null,
-      new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, 
JMap[String, IMetric]]],
-      new Atom(false))
-  }
-
-  private def getComponentToStreamFields(topology: StormTopology)
-    : JMap[String, JMap[String, Fields]] = {
-    val spouts = topology.get_spouts().asScala
-    val bolts = topology.get_bolts().asScala
-
-    val spoutFields = spouts.map {
-      case (id, component) => id -> 
getComponentToFields(component.get_common())
-    }
-
-    val boltFields = bolts.map {
-      case (id, component) => id -> 
getComponentToFields(component.get_common())
-    }
-
-    val systemFields = Map(SYSTEM_COMPONENT_ID ->
-      Map(SYSTEM_TICK_STREAM_ID -> new 
Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava)
-
-    (spoutFields ++ boltFields ++ systemFields).asJava
-  }
-
-  private def getComponentToFields(common: ComponentCommon): JMap[String, 
Fields] = {
-    val streams = common.get_streams.asScala
-    streams.map { case (sid, stream) =>
-      sid -> new Fields(stream.get_output_fields())
-    }.asJava
-  }
-
-  private def getComponentToSortedTasks(
-      taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = {
-    taskToComponent.groupBy(_._2).map { case (component, map) =>
-      val sortedTasks = map.keys.toList.sorted.asJava
-      component -> sortedTasks
-    }.asJava
-  }
-
-  private def getTaskToComponent(dag: DAG): Map[Integer, String] = {
-    val taskToComponent = dag.processors.flatMap { case (processorId, 
processorDescription) =>
-      val parallelism = processorDescription.parallelism
-      val component = 
processorDescription.taskConf.getString(STORM_COMPONENT).get
-      (0 until parallelism).map(index =>
-        gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component)
-    }
-    taskToComponent
-  }
-
-  // Workarounds to support storm ShellBolt
-  private def mkPidDir: String = {
-    val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid"
-    try {
-      FileUtils.forceMkdir(new File(pidDir))
-    } catch {
-      case ex: IOException =>
-        LOG.error(s"failed to create pid directory $pidDir")
-    }
-    pidDir
-  }
-
-  // a workaround to support storm ShellBolt
-  private def mkCodeDir: String = {
-    val jarPath = System.getProperty("java.class.path").split(":").last
-    val destDir = FileUtils.getTempDirectoryPath + File.separator + "storm"
-
-    try {
-      FileUtils.forceMkdir(new File(destDir))
-
-      val jar = new JarFile(jarPath)
-      val enumEntries = jar.entries().asScala
-      enumEntries.foreach { entry =>
-        val file = new File(destDir + File.separator + entry.getName)
-        if (!entry.isDirectory) {
-          file.getParentFile.mkdirs()
-
-          val is = jar.getInputStream(entry)
-          val fos = new FileOutputStream(file)
-          try {
-            IOUtils.copy(is, fos)
-          } catch {
-            case ex: IOException =>
-              LOG.error(s"failed to copy data from ${entry.getName} to 
${file.getName}")
-          } finally {
-            fos.close()
-            is.close()
-          }
-        }
-      }
-    } catch {
-      case ex: IOException =>
-        LOG.error(s"could not extract $destDir from $jarPath")
-    }
-
-    destDir + File.separator + "resources"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
deleted file mode 100644
index 5c3fc3e..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.topology
-
-import java.lang.{Iterable => JIterable}
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, 
Map => JMap}
-
-import akka.actor.ActorSystem
-import backtype.storm.Config
-import backtype.storm.generated._
-import backtype.storm.utils.{ThriftTopologyUtils, Utils}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.processor.StormProcessor
-import io.gearpump.experiments.storm.producer.StormProducer
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.experiments.storm.util.StormUtil
-import io.gearpump.experiments.storm.util.StormUtil._
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.task.Task
-import io.gearpump.util.LogUtil
-
-// TODO: Refactor this file, we should disable using of JavaConversions
-// scalastyle:off javaconversions
-import scala.collection.JavaConversions._
-// scalastyle:on javaconversions
-
-object GearpumpStormTopology {
-  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
-
-  def apply(
-      name: String,
-      topology: StormTopology,
-      appConfigInJson: String)(implicit system: ActorSystem): 
GearpumpStormTopology = {
-    new GearpumpStormTopology(
-      name,
-      topology,
-      Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
-      parseJsonStringToMap(appConfigInJson)
-    )
-  }
-}
-
-/**
- * this is a wrapper over Storm topology which
- * 1. merges Storm and Gearpump configs
- * 2. creates Gearpump processors
- * 3. provides interface for Gearpump applications to use Storm topology
- *
- * an implicit ActorSystem is required to create Gearpump processors
- * @param name topology name
- * @param topology Storm topology
- * @param sysConfig configs from "defaults.yaml" and custom config file
- * @param appConfig config submitted from user application
- */
-private[storm] class GearpumpStormTopology(
-    name: String,
-    topology: StormTopology,
-    sysConfig: JMap[AnyRef, AnyRef],
-    appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
-
-  private val spouts = topology.get_spouts()
-  private val bolts = topology.get_bolts()
-  private val stormConfig = mergeConfigs(sysConfig, appConfig, 
getComponentConfigs(spouts, bolts))
-  private val spoutProcessors = spouts.map { case (id, spout) =>
-    id -> spoutToProcessor(id, spout, stormConfig.toMap)
-  }.toMap
-  private val boltProcessors = bolts.map { case (id, bolt) =>
-    id -> boltToProcessor(id, bolt, stormConfig.toMap)
-  }.toMap
-  private val allProcessors = spoutProcessors ++ boltProcessors
-
-  /**
-   * @return merged Storm config with priority
-   *         defaults.yaml < custom file config < application config < 
component config
-   */
-  def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
-
-  /**
-   * @return Storm components to Gearpump processors
-   */
-  def getProcessors: Map[String, Processor[Task]] = allProcessors
-
-  /**
-   * @param sourceId source component id
-   * @return target Storm components and Gearpump processors
-   */
-  def getTargets(sourceId: String): Map[String, Processor[Task]] = {
-    getTargets(sourceId, topology).map { case (targetId, _) =>
-      targetId -> boltProcessors(targetId)
-    }
-  }
-
-  /**
-   * merge configs from application, custom config file and component
-   */
-  private def mergeConfigs(
-      sysConfig: JMap[AnyRef, AnyRef],
-      appConfig: JMap[AnyRef, AnyRef],
-      componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] 
= {
-    val allConfig = new JHashMap[AnyRef, AnyRef]
-    allConfig.putAll(sysConfig)
-    allConfig.putAll(appConfig)
-    allConfig.putAll(getMergedComponentConfig(componentConfigs, 
allConfig.toMap))
-    allConfig.put(Config.TOPOLOGY_NAME, name)
-    allConfig
-  }
-
-  /**
-   * creates Gearpump processor from Storm spout
-   * @param spoutId spout id
-   * @param spoutSpec spout spec
-   * @param stormConfig merged storm config
-   * @param system actor system
-   * @return a Processor[StormProducer]
-   */
-  private def spoutToProcessor(spoutId: String, spoutSpec: SpoutSpec,
-      stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): 
Processor[Task] = {
-    val componentCommon = spoutSpec.get_common()
-    val taskConf = UserConfig.empty
-      .withString(STORM_COMPONENT, spoutId)
-    val parallelism = getParallelism(stormConfig, componentCommon)
-    Processor[StormProducer](parallelism, spoutId, taskConf)
-  }
-
-  /**
-   * creates Gearpump processor from Storm bolt
-   * @param boltId bolt id
-   * @param boltSpec bolt spec
-   * @param stormConfig merged storm config
-   * @param system actor system
-   * @return a Processor[StormProcessor]
-   */
-  private def boltToProcessor(boltId: String, boltSpec: Bolt,
-      stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): 
Processor[Task] = {
-    val componentCommon = boltSpec.get_common()
-    val taskConf = UserConfig.empty
-      .withString(STORM_COMPONENT, boltId)
-      .withBoolean("state.checkpoint.enable", 
StormUtil.ackEnabled(stormConfig))
-    val parallelism = getParallelism(stormConfig, componentCommon)
-    Processor[StormProcessor](parallelism, boltId, taskConf)
-  }
-
-  /**
-   * @return target components and streams
-   */
-  private def getTargets(componentId: String, topology: StormTopology)
-    : Map[String, Map[String, Grouping]] = {
-    val componentIds = ThriftTopologyUtils.getComponentIds(topology)
-    componentIds.flatMap { otherComponentId =>
-      getInputs(otherComponentId, topology).toList.map(otherComponentId -> _)
-    }.foldLeft(Map.empty[String, Map[String, Grouping]]) {
-      (allTargets, componentAndInput) =>
-        val (otherComponentId, (globalStreamId, grouping)) = componentAndInput
-        val inputStreamId = globalStreamId.get_streamId()
-        val inputComponentId = globalStreamId.get_componentId
-        if (inputComponentId.equals(componentId)) {
-          val curr = allTargets.getOrElse(otherComponentId, Map.empty[String, 
Grouping])
-          allTargets + (otherComponentId -> (curr + (inputStreamId -> 
grouping)))
-        } else {
-          allTargets
-        }
-    }
-  }
-
-  /**
-   * @return input stream and grouping for a Storm component
-   */
-  private def getInputs(componentId: String, topology: StormTopology)
-    : JMap[GlobalStreamId, Grouping] = {
-    ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs
-  }
-
-  /**
-   * get Storm component parallelism according to the following rule,
-   * 1. use "topology.tasks" if defined; otherwise use parallelism_hint
-   * 2. parallelism should not be larger than "topology.max.task.parallelism" 
if defined
-   * 3. component config overrides system config
-   * @param stormConfig System configs without merging "topology.tasks" and
-   *                    "topology.max.task.parallelism" of component
-   * @return number of task instances for a component
-   */
-  private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: 
ComponentCommon): Int = {
-    val parallelismHint: Int = if (component.is_set_parallelism_hint()) {
-      component.get_parallelism_hint()
-    } else {
-      1
-    }
-    val mergedConfig = new JHashMap[AnyRef, AnyRef]
-    val componentConfig = parseJsonStringToMap(component.get_json_conf)
-    mergedConfig.putAll(stormConfig)
-    mergedConfig.putAll(componentConfig)
-    val numTasks: Int = getInt(mergedConfig, 
Config.TOPOLOGY_TASKS).getOrElse(parallelismHint)
-    val parallelism: Int = getInt(mergedConfig, 
Config.TOPOLOGY_MAX_TASK_PARALLELISM)
-      .fold(numTasks)(p => math.min(p, numTasks))
-    parallelism
-  }
-
-  private def getComponentConfigs(spouts: JMap[String, SpoutSpec],
-      bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = {
-    spouts.map { case (id, spoutSpec) =>
-      parseJsonStringToMap(spoutSpec.get_common().get_json_conf())
-    } ++ bolts.map { case (id, boltSpec) =>
-      parseJsonStringToMap(boltSpec.get_common().get_json_conf())
-    }
-  }
-
-  /**
-   * merge component configs "topology.kryo.decorators" and 
"topology.kryo.register"
-   * @param componentConfigs list of component configs
-   * @param allConfig existing configs without merging component configs
-   * @return the two configs merged from all the component configs and 
existing configs
-   */
-  private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, 
AnyRef]],
-      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
-    val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig))
-    mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig))
-    mergedConfig
-  }
-
-  /**
-   * @param componentConfigs list of component configs
-   * @param allConfig existing configs without merging component configs
-   * @return a merged config with a list of distinct kryo decorators from 
component and
-   *         existing configs
-   */
-  private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, 
AnyRef]],
-      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
-    val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
-    val key = Config.TOPOLOGY_KRYO_DECORATORS
-    val configs = getConfigValues(componentConfigs, allConfig, key)
-    val distincts = configs.foldLeft(Set.empty[String]) {
-      case (accum, config: JIterable[_]) =>
-        accum ++ config.map {
-          case s: String => s
-          case illegal =>
-            throw new IllegalArgumentException(s"$key must be a List of 
Strings; actually $illegal")
-        }
-      case (accum, null) =>
-        accum
-      case illegal =>
-        throw new IllegalArgumentException(s"$key must be a List of Strings; 
actually $illegal")
-    }
-    if (distincts.nonEmpty) {
-      val decorators: JList[String] = new JArrayList(distincts.size)
-      decorators.addAll(distincts)
-      mergedConfig.put(key, decorators)
-    }
-    mergedConfig
-  }
-
-  /**
-   * @param componentConfigs list of component configs
-   * @param allConfig existing configs without merging component configs
-   * @return a merged config with component config overriding existing configs
-   */
-  private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, 
AnyRef]],
-      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
-    val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
-    val key = Config.TOPOLOGY_KRYO_REGISTER
-    val configs = getConfigValues(componentConfigs, allConfig, key)
-    val merged = configs.foldLeft(Map.empty[String, String]) {
-      case (accum, config: JIterable[_]) =>
-        accum ++ config.map {
-          case m: JMap[_, _] =>
-            m.map {
-              case (k: String, v: String) => k -> v
-              case illegal =>
-                throw new IllegalArgumentException(
-                  s"each element of $key must be a String or a Map of Strings; 
actually $illegal")
-            }
-          case s: String =>
-            Map(s -> null)
-          case illegal =>
-            throw new IllegalArgumentException(s"each element of $key must be 
a String or " +
-              s"a Map of Strings; actually $illegal")
-        }.reduce(_ ++ _)
-      case (accum, null) =>
-        accum
-      case (accum, illegal) =>
-        throw new IllegalArgumentException(
-          s"$key must be an Iterable containing only Strings or Maps of 
Strings; actually $illegal")
-    }
-    if (merged.nonEmpty) {
-      val registers: JMap[String, String] = new JHashMap[String, 
String](merged.size)
-      registers.putAll(merged)
-      mergedConfig.put(key, registers)
-    }
-    mergedConfig
-  }
-
-  /**
-   * @param componentConfigs list of raw component configs
-   * @param allConfig existing configs without merging component configs
-   * @param key config key
-   * @return a list of values for a config from both component configs and 
existing configs
-   */
-  private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
-      allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = {
-    componentConfigs.map(config => config.get(key)) ++ 
allConfig.get(key).toList
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
deleted file mode 100644
index ee54add..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.topology
-
-import java.util.{List => JList}
-
-import backtype.storm.task.GeneralTopologyContext
-import backtype.storm.tuple.{Tuple, TupleImpl}
-
-import io.gearpump.TimeStamp
-
-/**
- * this carries Storm tuple values in the Gearpump world
- * the targetPartitions field dictate which tasks a GearpumpTuple should be 
sent to
- * see [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] for more 
info
- */
-private[storm] class GearpumpTuple(
-    val values: JList[AnyRef],
-    val sourceTaskId: Integer,
-    val sourceStreamId: String,
-    @transient val targetPartitions: Map[String, Array[Int]]) extends 
Serializable {
-  /**
-   * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm 
component
-   * this is needed for each incoming message
-   * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at 
deserialization
-   * @param topologyContext topology context used for all tasks
-   * @return a Tuple
-   */
-  def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): 
Tuple = {
-    TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, 
timestamp)
-  }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple]
-
-  override def equals(other: Any): Boolean = other match {
-    case that: GearpumpTuple =>
-      (that canEqual this) &&
-        values == that.values &&
-        sourceTaskId == that.sourceTaskId &&
-        sourceStreamId == that.sourceStreamId
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    val state = Seq(values, sourceTaskId, sourceStreamId)
-    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
-  }
-}
-
-case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: 
JList[AnyRef],
-    sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
-  extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
deleted file mode 100644
index 041a90c..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import io.gearpump.experiments.storm.partitioner.StormPartitioner
-import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.partitioner.Partitioner
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.task.Task
-import io.gearpump.util.Graph
-
-object GraphBuilder {
-
-  /**
-   * build a Gearpump DAG from a Storm topology
-   * @param topology a wrapper over Storm topology
-   * @return a DAG
-   */
-  def build(topology: GearpumpStormTopology): Graph[Processor[_ <: Task], _ <: 
Partitioner] = {
-    val processorGraph = Graph.empty[Processor[Task], Partitioner]
-
-    topology.getProcessors.foreach { case (sourceId, sourceProcessor) =>
-      topology.getTargets(sourceId).foreach { case (targetId, targetProcessor) 
=>
-        processorGraph.addEdge(sourceProcessor, new 
StormPartitioner(targetId), targetProcessor)
-      }
-    }
-
-    processorGraph
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
deleted file mode 100644
index 91277cd..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.util.{List => JList}
-import scala.util.Random
-
-import backtype.storm.generated.GlobalStreamId
-import backtype.storm.grouping.CustomStreamGrouping
-import backtype.storm.task.TopologyContext
-import backtype.storm.tuple.Fields
-
-/**
- * Grouper is identical to that in storm but return gearpump partitions for 
storm tuple values
- */
-sealed trait Grouper {
-  /**
-   * @param taskId storm task id of source task
-   * @param values storm tuple values
-   * @return a list of gearpump partitions
-   */
-  def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int]
-}
-
-/**
- * GlobalGrouper always returns partition 0
- */
-class GlobalGrouper extends Grouper {
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
Array(0)
-}
-
-/**
- * NoneGrouper randomly returns partition
- *
- * @param numTasks number of target tasks
- */
-class NoneGrouper(numTasks: Int) extends Grouper {
-  private val random = new Random
-
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
{
-    val partition = StormUtil.mod(random.nextInt, numTasks)
-    Array(partition)
-  }
-}
-
-/**
- * ShuffleGrouper shuffles partitions and returns them sequentially, and then 
shuffles again
- *
- * @param numTasks number of target tasks
- */
-class ShuffleGrouper(numTasks: Int) extends Grouper {
-  private val random = new Random
-  private var index = -1
-  private var partitions = List.empty[Int]
-
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
{
-    index += 1
-    if (partitions.isEmpty) {
-      partitions = 0.until(numTasks).toList
-      partitions = random.shuffle(partitions)
-    } else if (index >= numTasks) {
-      index = 0
-      partitions = random.shuffle(partitions)
-    }
-    Array(partitions(index))
-  }
-}
-
-/**
- * FieldsGrouper returns partition based on value of groupFields
- *
- * @param outFields declared output fields of source task
- * @param groupFields grouping fields of target tasks
- * @param numTasks number of target tasks
- */
-class FieldsGrouper(outFields: Fields, groupFields: Fields, numTasks: Int) 
extends Grouper {
-
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
{
-    val hash = outFields.select(groupFields, values).hashCode()
-    val partition = StormUtil.mod(hash, numTasks)
-    Array(partition)
-  }
-}
-
-/**
- * AllGrouper returns all partitions
- *
- * @param numTasks number of target tasks
- */
-class AllGrouper(numTasks: Int) extends Grouper {
-  val partitions = (0 until numTasks).toArray
-
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
{
-    partitions
-  }
-}
-
-/**
- * CustomGrouper allows users to specify grouping strategy
- *
- * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]]
- */
-class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {
-
-  def prepare(
-      topologyContext: TopologyContext, globalStreamId: GlobalStreamId, 
targetTasks: JList[Integer])
-    : Unit = {
-    grouping.prepare(topologyContext, globalStreamId, targetTasks)
-  }
-
-  override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
{
-    val tasks = grouping.chooseTasks(taskId, values)
-    val result = new Array[Int](tasks.size())
-
-    val iter = tasks.iterator()
-
-    var index = 0
-    while (iter.hasNext()) {
-      val value = iter.next()
-      result(index) = StormUtil.stormTaskIdToGearpump(value).index
-      index += 1
-    }
-    result
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
deleted file mode 100644
index 928d07b..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-object StormConstants {
-  val STORM_COMPONENT = "storm_component"
-  val STORM_TOPOLOGY = "storm_topology"
-  val STORM_CONFIG = "storm_config"
-  val SYSTEM_COMPONENT_ID = "__system"
-  val SYSTEM_COMPONENT_OUTPUT_FIELDS = "rate_secs"
-  val SYSTEM_TASK_ID: Integer = -1
-  val SYSTEM_TICK_STREAM_ID = "__tick"
-
-  val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds
-
-  val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
deleted file mode 100644
index 74d1b2b..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => 
JList, Map => JMap}
-import scala.collection.JavaConverters._
-
-import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject}
-import backtype.storm.grouping.CustomStreamGrouping
-import backtype.storm.task.TopologyContext
-import backtype.storm.tuple.Fields
-import backtype.storm.utils.Utils
-import org.slf4j.Logger
-
-import io.gearpump._
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.experiments.storm.util.StormUtil._
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.task.{TaskContext, TaskId}
-import io.gearpump.util.LogUtil
-
-object StormOutputCollector {
-  private val LOG: Logger = LogUtil.getLogger(classOf[StormOutputCollector])
-  private[storm] val EMPTY_LIST: JList[Integer] = new JArrayList[Integer](0)
-
-  def apply(taskContext: TaskContext, topologyContext: TopologyContext): 
StormOutputCollector = {
-    val stormTaskId = topologyContext.getThisTaskId
-    val componentId = topologyContext.getThisComponentId
-    val taskToComponent = topologyContext.getTaskToComponent
-    val componentToProcessorId = 
getComponentToProcessorId(taskToComponent.asScala.toMap)
-    val targets = topologyContext.getTargets(componentId)
-    val streamGroupers: Map[String, Grouper] =
-      targets.asScala.flatMap { case (streamId, targetGrouping) =>
-        targetGrouping.asScala.collect { case (target, grouping) if 
!grouping.is_set_direct() =>
-          streamId -> getGrouper(topologyContext, grouping, componentId, 
streamId, target)
-        }
-      }.toMap
-    val getTargetPartitionsFn = (streamId: String, values: JList[AnyRef]) => {
-      getTargetPartitions(stormTaskId, streamId, targets,
-        streamGroupers, componentToProcessorId, values)
-    }
-    new StormOutputCollector(stormTaskId, taskToComponent, targets, 
getTargetPartitionsFn,
-      taskContext, LatestTime)
-  }
-
-  /**
-   * get target Gearpump partitions and Storm task ids
-   */
-  private def getTargetPartitions(
-      stormTaskId: Int,
-      streamId: String,
-      targets: JMap[String, JMap[String, Grouping]],
-      streamGroupers: Map[String, Grouper],
-      componentToProcessorId: Map[String, ProcessorId],
-      values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) = {
-    val ret: JList[Integer] = new JArrayList[Integer](targets.size)
-
-    @annotation.tailrec
-    def getRecur(iter: JIterator[String],
-        accum: Map[String, Array[Int]]): Map[String, Array[Int]] = {
-      if (iter.hasNext) {
-        val target = iter.next
-        val grouper = streamGroupers(streamId)
-        val partitions = grouper.getPartitions(stormTaskId, values)
-        partitions.foreach { p =>
-          val stormTaskId = 
gearpumpTaskIdToStorm(TaskId(componentToProcessorId(target), p))
-          ret.add(stormTaskId)
-        }
-        getRecur(iter, accum + (target -> partitions))
-      } else {
-        accum
-      }
-    }
-    val targetPartitions = getRecur(targets.get(streamId).keySet().iterator,
-      Map.empty[String, Array[Int]])
-    (targetPartitions, ret)
-  }
-
-  private def getComponentToProcessorId(taskToComponent: Map[Integer, String])
-    : Map[String, ProcessorId] = {
-    taskToComponent.map { case (id, component) =>
-      component -> stormTaskIdToGearpump(id).processorId
-    }
-  }
-
-  private def getGrouper(topologyContext: TopologyContext, grouping: Grouping,
-      source: String, streamId: String, target: String): Grouper = {
-    val outFields = topologyContext.getComponentOutputFields(source, streamId)
-    val targetTasks = topologyContext.getComponentTasks(target)
-    val targetTaskNum = targetTasks.size
-    val globalStreamId = new GlobalStreamId(source, streamId)
-
-    grouping.getSetField match {
-      case Grouping._Fields.FIELDS =>
-        if (isGlobalGrouping(grouping)) {
-          new GlobalGrouper
-        } else {
-          new FieldsGrouper(outFields, new Fields(grouping.get_fields()), 
targetTaskNum)
-        }
-      case Grouping._Fields.SHUFFLE =>
-        new ShuffleGrouper(targetTaskNum)
-      case Grouping._Fields.NONE =>
-        new NoneGrouper(targetTaskNum)
-      case Grouping._Fields.ALL =>
-        new AllGrouper(targetTaskNum)
-      case Grouping._Fields.CUSTOM_SERIALIZED =>
-        val customGrouping = 
Utils.javaDeserialize(grouping.get_custom_serialized,
-          classOf[Serializable]).asInstanceOf[CustomStreamGrouping]
-        val grouper = new CustomGrouper(customGrouping)
-        grouper.prepare(topologyContext, globalStreamId, targetTasks)
-        grouper
-      case Grouping._Fields.CUSTOM_OBJECT =>
-        val customObject = grouping.get_custom_object()
-        val customGrouping = instantiateJavaObject(customObject)
-        val grouper = new CustomGrouper(customGrouping)
-        grouper.prepare(topologyContext, globalStreamId, targetTasks)
-        grouper
-      case Grouping._Fields.LOCAL_OR_SHUFFLE =>
-        // Gearpump has built-in support for sending messages to local actor
-        new ShuffleGrouper(targetTaskNum)
-      case Grouping._Fields.DIRECT =>
-        throw new Exception("direct grouping should not be called here")
-    }
-  }
-
-  private def isGlobalGrouping(grouping: Grouping): Boolean = {
-    grouping.getSetField == Grouping._Fields.FIELDS &&
-      grouping.get_fields.isEmpty
-  }
-
-  private def instantiateJavaObject(javaObject: JavaObject): 
CustomStreamGrouping = {
-    val className = javaObject.get_full_class_name()
-    val args = javaObject.get_args_list().asScala.map(_.getFieldValue)
-    val customGrouping = 
Class.forName(className).getConstructor(args.map(_.getClass): _*)
-      .newInstance(args).asInstanceOf[CustomStreamGrouping]
-    customGrouping
-  }
-}
-
-/**
- * Provides common functionality for
- * [[io.gearpump.experiments.storm.producer.StormSpoutOutputCollector]]
- * and [[io.gearpump.experiments.storm.processor.StormBoltOutputCollector]]
- */
-class StormOutputCollector(
-    stormTaskId: Int,
-    taskToComponent: JMap[Integer, String],
-    targets: JMap[String, JMap[String, Grouping]],
-    getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, 
Array[Int]], JList[Integer]),
-    val taskContext: TaskContext,
-    private var timestamp: TimeStamp) {
-  import io.gearpump.experiments.storm.util.StormOutputCollector._
-
-  /**
-   * Emits tuple values into a stream (invoked by a Storm output collector).
-   *
-   * wrapS the values into a message of [[GearpumpTuple]] along with the 
target partitions
-   * to tell [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] 
where to send the
-   * message. We also return the corresponding target Storm task ids back to 
the collector
-   *
-   * @param streamId Storm stream id
-   * @param values Storm tuple values
-   * @return Target Storm task ids
-   */
-  def emit(streamId: String, values: JList[AnyRef]): JList[Integer] = {
-    if (targets.containsKey(streamId)) {
-      val (targetPartitions, targetStormTaskIds) = 
getTargetPartitionsFn(streamId, values)
-      val tuple = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
-      taskContext.output(Message(tuple, timestamp))
-      targetStormTaskIds
-    } else {
-      EMPTY_LIST
-    }
-  }
-
-  /**
-   * Emit tuple values to a specific Storm task (invoked by Storm output 
collector).
-   *
-   * We translate the Storm task id into Gearpump TaskId and tell
-   * [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] through 
the targetPartitions
-   * field of [[io.gearpump.experiments.storm.topology.GearpumpTuple]]
-   *
-   * @param id Storm task id
-   * @param streamId Storm stream id
-   * @param values Storm tuple values
-   */
-  def emitDirect(id: Int, streamId: String, values: JList[AnyRef]): Unit = {
-    if (targets.containsKey(streamId)) {
-      val target = taskToComponent.get(id)
-      val partition = stormTaskIdToGearpump(id).index
-      val targetPartitions = Map(target -> Array(partition))
-      val tuple = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
-      taskContext.output(Message(tuple, timestamp))
-    }
-  }
-
-  /**
-   * set timestamp from each incoming Message if not attached.
-   */
-  def setTimestamp(timestamp: TimeStamp): Unit = {
-    this.timestamp = timestamp
-  }
-
-  def getTimestamp: Long = timestamp
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
deleted file mode 100644
index f1c7bab..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.storm.util
-
-import java.lang.{Integer => JInteger}
-import java.util.{Map => JMap}
-
-import akka.actor.ExtendedActorSystem
-import backtype.storm.serialization.SerializationFactory
-import backtype.storm.utils.ListDelegate
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.io.{Input, Output}
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.serializer.{SerializationFramework, Serializer}
-
-class StormSerializationFramework extends SerializationFramework {
-  private var stormConfig: JMap[AnyRef, AnyRef] = null
-  private var pool: ThreadLocal[Serializer] = null
-
-  override def init(system: ExtendedActorSystem, config: UserConfig): Unit = {
-    implicit val actorSystem = system
-    stormConfig = config.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get
-    pool = new ThreadLocal[Serializer]() {
-      override def initialValue(): Serializer = {
-        val kryo = SerializationFactory.getKryo(stormConfig)
-        new StormSerializer(kryo)
-      }
-    }
-  }
-
-  override def get(): Serializer = {
-    pool.get()
-  }
-}
-
-/**
- * serializes / deserializes 
[[io.gearpump.experiments.storm.topology.GearpumpTuple]]
- *
- * @param kryo created by Storm 
[[backtype.storm.serialization.SerializationFactory]]
- */
-class StormSerializer(kryo: Kryo) extends Serializer {
-  // -1 means the max buffer size is 2147483647
-  private val output = new Output(4096, -1)
-  private val input = new Input
-
-  override def serialize(message: Any): Array[Byte] = {
-    val tuple = message.asInstanceOf[GearpumpTuple]
-    output.clear()
-    output.writeInt(tuple.sourceTaskId)
-    output.writeString(tuple.sourceStreamId)
-    val listDelegate = new ListDelegate
-    listDelegate.setDelegate(tuple.values)
-    kryo.writeObject(output, listDelegate)
-    output.toBytes
-  }
-
-  override def deserialize(msg: Array[Byte]): Any = {
-    input.setBuffer(msg)
-    val sourceTaskId: JInteger = input.readInt
-    val sourceStreamId: String = input.readString
-    val listDelegate = kryo.readObject[ListDelegate](input, 
classOf[ListDelegate])
-    new GearpumpTuple(listDelegate.getDelegate, sourceTaskId, sourceStreamId, 
null)
-  }
-}


Reply via email to