http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
index dbbe738..aca2736 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,28 +21,29 @@ package io.gearpump.experiments.storm.main
 import java.io.{File, FileOutputStream, FileWriter}
 import java.nio.ByteBuffer
 import java.nio.channels.{Channels, WritableByteChannel}
-import java.util.{Map => JMap, HashMap => JHashMap, UUID}
+import java.util.{HashMap => JHashMap, Map => JMap, UUID}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 import akka.actor.ActorSystem
+import com.typesafe.config.ConfigValueFactory
 import backtype.storm.Config
 import backtype.storm.generated._
 import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
-import backtype.storm.utils.TimeCacheMap.ExpiredCallback
-import backtype.storm.utils.{TimeCacheMap, Utils}
-import com.typesafe.config.ConfigValueFactory
-import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import backtype.storm.utils.Utils
+import org.apache.storm.shade.org.json.simple.JSONValue
+import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.slf4j.Logger
+
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
 import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, 
StormUtil}
+import io.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback
+import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, 
StormUtil, TimeCacheMapWrapper}
 import io.gearpump.streaming.StreamApplication
 import io.gearpump.util.{AkkaApp, Constants, LogUtil}
-import org.apache.storm.shade.org.json.simple.JSONValue
-import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
 
 object GearpumpNimbus extends AkkaApp with ArgumentsParser {
   private val THRIFT_PORT = StormUtil.getThriftPort()
@@ -50,7 +51,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    OUTPUT -> CLIOption[String]("<output path for configuration file>", 
required = false, defaultValue = Some("app.yaml"))
+    OUTPUT -> CLIOption[String]("<output path for configuration file>",
+      required = false, defaultValue = Some("app.yaml"))
   )
 
   override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
@@ -72,11 +74,13 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser {
       val thriftServer = createServer(clientContext, stormConf)
       thriftServer.serve()
     }
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
-  private def createServer(clientContext: ClientContext, stormConf: 
JMap[AnyRef, AnyRef]): ThriftServer = {
-    val processor = new Nimbus.Processor[GearpumpNimbus](new 
GearpumpNimbus(clientContext, stormConf))
+  private def createServer(
+      clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): 
ThriftServer = {
+    val processor = new Nimbus.Processor[GearpumpNimbus](new 
GearpumpNimbus(clientContext,
+      stormConf))
     val connectionType = ThriftConnectionType.NIMBUS
     new ThriftServer(stormConf, processor, connectionType)
   }
@@ -98,43 +102,56 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser 
{
     }
   }
 
-  import Constants._
+  import io.gearpump.util.Constants._
   private def updateClientConfig(config: Config): Config = {
     val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
-    val appClassPath = s"$storm${File.pathSeparator}" + 
config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
-    val executorClassPath = s"$storm${File.pathSeparator}" + 
config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+    val appClassPath = s"$storm${File.pathSeparator}" +
+      config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+    val executorClassPath = s"$storm${File.pathSeparator}" +
+      config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
 
-    val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(appClassPath))
-        .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(executorClassPath))
+    val updated = config
+      .withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(appClassPath))
+      .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH,
+        ConfigValueFactory.fromAnyRef(executorClassPath))
 
     if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
-      val serializerConfig = 
ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
+      val serializerConfig = ConfigValueFactory.fromAnyRef(
+        config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
       updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
     } else {
       updated
     }
   }
-
 }
 
-class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, 
AnyRef]) extends Nimbus.Iface {
+class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, 
AnyRef])
+  extends Nimbus.Iface {
+
   import io.gearpump.experiments.storm.main.GearpumpNimbus._
 
   private var applications = Map.empty[String, Int]
   private var topologies = Map.empty[String, TopologyData]
-  private val expireSeconds = StormUtil.getInt(stormConf, 
Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
-  private val expiredCallback = new ExpiredCallback[String, 
WritableByteChannel] {
+  private val expireSeconds = StormUtil.getInt(stormConf,
+    Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
+  private val expiredCallback = new Callback[String, WritableByteChannel] {
     override def expire(k: String, v: WritableByteChannel): Unit = {
       v.close()
     }
   }
-  private val fileCacheMap = new TimeCacheMap[String, 
WritableByteChannel](expireSeconds, expiredCallback)
-
-  override def submitTopology(name: String, uploadedJarLocation: String, 
jsonConf: String, topology: StormTopology): Unit = {
-    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new 
SubmitOptions(TopologyInitialStatus.ACTIVE))
+  private val fileCacheMap = new TimeCacheMapWrapper[String, 
WritableByteChannel](expireSeconds,
+    expiredCallback)
+
+  override def submitTopology(
+      name: String, uploadedJarLocation: String, jsonConf: String, topology: 
StormTopology)
+    : Unit = {
+    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
+      new SubmitOptions(TopologyInitialStatus.ACTIVE))
   }
 
-  override def submitTopologyWithOpts(name: String, uploadedJarLocation: 
String, jsonConf: String, topology: StormTopology, options: SubmitOptions): 
Unit = {
+  override def submitTopologyWithOpts(
+      name: String, uploadedJarLocation: String,
+      jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit 
= {
     LOG.info(s"Submitted topology $name")
     implicit val system = clientContext.system
     val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
@@ -142,8 +159,8 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
     val workerNum = StormUtil.getInt(stormConfig, 
Config.TOPOLOGY_WORKERS).getOrElse(1)
     val processorGraph = GraphBuilder.build(gearpumpStormTopology)
     val config = UserConfig.empty
-          .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
-          .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, 
stormConfig)
+      .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
+      .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, 
stormConfig)
     val app = StreamApplication(name, processorGraph, config)
     LOG.info(s"jar file uploaded to $uploadedJarLocation")
     val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
@@ -167,13 +184,13 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
   }
 
   override def getTopology(name: String): StormTopology = {
-    updateApps
+    updateApps()
     topologies.getOrElse(name,
       throw new RuntimeException(s"topology $name not found")).topology
   }
 
   override def getTopologyConf(name: String): String = {
-    updateApps
+    updateApps()
     JSONValue.toJSONString(topologies.getOrElse(name,
       throw new RuntimeException(s"topology $name not found")).config)
   }
@@ -210,7 +227,7 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
   }
 
   override def getClusterInfo: ClusterSummary = {
-    updateApps
+    updateApps()
     val topologySummaryList = topologies.map { case (name, _) =>
       new TopologySummary(name, name, 0, 0, 0, 0, "")
     }.toSeq
@@ -224,7 +241,7 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
   override def uploadNewCredentials(s: String, credentials: Credentials): Unit 
= {
     throw new UnsupportedOperationException
   }
-    override def activate(name: String): Unit = {
+  override def activate(name: String): Unit = {
     throw new UnsupportedOperationException
   }
 
@@ -250,7 +267,7 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
     throw new UnsupportedOperationException
   }
 
-  private def updateApps: Unit = {
+  private def updateApps(): Unit = {
     clientContext.listApps.appMasters.foreach { app =>
       val name = app.appName
       if (applications.contains(name)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
index d4c0e8a..fbdd579 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,6 +20,7 @@ package io.gearpump.experiments.storm.main
 
 import backtype.storm.Config
 import backtype.storm.utils.Utils
+
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import io.gearpump.util.Constants._
 import io.gearpump.util.{AkkaApp, LogUtil, Util}
@@ -29,14 +30,15 @@ object GearpumpStormClient extends AkkaApp with 
ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array(
     "jar" -> CLIOption[String]("<storm jar>", required = true),
     "config" -> CLIOption[Int]("<storm config file>", required = true),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false, 
defaultValue = Some(false)))
+    "verbose" -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)))
 
   override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
     val config = parse(args)
 
     val verbose = config.getBoolean("verbose")
     if (verbose) {
-      LogUtil.verboseLogToConsole
+      LogUtil.verboseLogToConsole()
     }
 
     val jar = config.getString("jar")
@@ -54,11 +56,12 @@ object GearpumpStormClient extends AkkaApp with 
ArgumentsParser {
     val classPath = Array(s"$gearpumpHome/lib/*", 
s"$gearpumpHome/lib/storm/*", jar)
     val process = Util.startProcess(stormOptions, classPath, topology, 
stormArgs)
 
-    // wait till the process exit
+    // Waits till the process exit
     val exit = process.exitValue()
 
     if (exit != 0) {
-      throw new Exception(s"failed to submit jar, exit code $exit, error 
summary: ${process.logger.error}")
+      throw new Exception(s"failed to submit jar, exit code $exit, " +
+        s"error summary: ${process.logger.error}")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
index 04ccb91..e47e11d 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/partitioner/StormPartitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,25 +20,27 @@ package io.gearpump.experiments.storm.partitioner
 
 import io.gearpump.Message
 import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.experiments.storm.util.StormOutputCollector
-import io.gearpump.partitioner.{Partitioner, MulticastPartitioner}
+import io.gearpump.partitioner.{MulticastPartitioner, Partitioner}
 
 /**
- * this is a partitioner bound to a target Storm component
- * partitioning is already done in [[StormOutputCollector]] and
- * kept in "targetPartitions" of [[GearpumpTuple]]
+ * Partitioner bound to a target Storm component
+ *
+ * Partitioning is already done in 
[[io.gearpump.experiments.storm.util.StormOutputCollector]] and
+ * kept in "targetPartitions" of 
[[io.gearpump.experiments.storm.topology.GearpumpTuple]]
  * the partitioner just returns the partitions of the target
  *
  * In Gearpump, a message is sent from a task to all the subscribers.
  * In Storm, however, a message is sent to one or more of the subscribers.
- * Hence, we have to do the partitioning in [[StormOutputCollector]] till the 
Storm way
- * is supported in Gearpump
+ * Hence, we have to do the partitioning in
+ * [[io.gearpump.experiments.storm.util.StormOutputCollector]] till the Storm 
way is supported
+ * in Gearpump
  *
  * @param target target storm component id
  */
 private[storm] class StormPartitioner(target: String) extends 
MulticastPartitioner {
 
-  override def getPartitions(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Array[Int] = {
+  override def getPartitions(msg: Message, partitionNum: Int, 
currentPartitionId: Int)
+    : Array[Int] = {
     val stormTuple = msg.msg.asInstanceOf[GearpumpTuple]
     stormTuple.targetPartitions.getOrElse(target, 
Array(Partitioner.UNKNOWN_PARTITION_ID))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
index 3f1700f..5920bf8 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,9 +22,10 @@ import java.util.{Collection => JCollection, List => JList}
 
 import backtype.storm.task.IOutputCollector
 import backtype.storm.tuple.Tuple
+
 import io.gearpump.experiments.storm.topology.TimedTuple
-import io.gearpump.experiments.storm.util.StormOutputCollector
 import io.gearpump.experiments.storm.util.StormConstants._
+import io.gearpump.experiments.storm.util.StormOutputCollector
 import io.gearpump.streaming.task.ReportCheckpointClock
 
 /**
@@ -35,11 +36,13 @@ private[storm] class StormBoltOutputCollector(collector: 
StormOutputCollector,
   private var reportTime = 0L
   private var maxAckTime = 0L
 
-  override def emit(streamId: String, anchors: JCollection[Tuple], tuple: 
JList[AnyRef]): JList[Integer] = {
+  override def emit(
+      streamId: String, anchors: JCollection[Tuple], tuple: JList[AnyRef]): 
JList[Integer] = {
     collector.emit(streamId, tuple)
   }
 
-  override def emitDirect(taskId: Int, streamId: String, anchors: 
JCollection[Tuple], tuple: JList[AnyRef]): Unit = {
+  override def emitDirect(
+      taskId: Int, streamId: String, anchors: JCollection[Tuple], tuple: 
JList[AnyRef]): Unit = {
     collector.emitDirect(taskId, streamId, tuple)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
index b7dc3aa..67548b3 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/processor/StormProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,13 @@
 package io.gearpump.experiments.storm.processor
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.Duration
 
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
 import io.gearpump.experiments.storm.util._
 import io.gearpump.streaming.task._
-import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.concurrent.duration.Duration
 
 object StormProcessor {
   private[storm] val TICK = Message("tick")
@@ -38,11 +35,11 @@ object StormProcessor {
  * this is runtime container for Storm bolt
  */
 private[storm] class StormProcessor(gearpumpBolt: GearpumpBolt,
-                                    taskContext: TaskContext, conf: UserConfig)
+    taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
   import io.gearpump.experiments.storm.processor.StormProcessor._
 
-  def this(taskContext: TaskContext, conf:UserConfig) = {
+  def this(taskContext: TaskContext, conf: UserConfig) = {
     this(StormUtil.getGearpumpStormComponent(taskContext, 
conf)(taskContext.system)
       .asInstanceOf[GearpumpBolt], taskContext, conf)
   }
@@ -67,6 +64,8 @@ private[storm] class StormProcessor(gearpumpBolt: 
GearpumpBolt,
   }
 
   private def scheduleTick(freq: Int): Unit = {
-    taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)){ self ! TICK }
+    taskContext.scheduleOnce(Duration(freq, TimeUnit.SECONDS)) {
+      self ! TICK
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
index 787326b..39882c4 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormProducer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,16 +19,16 @@
 package io.gearpump.experiments.storm.producer
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.Duration
 
 import akka.actor.Actor.Receive
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
 import io.gearpump.experiments.storm.util._
 import io.gearpump.streaming.task._
 
-import scala.concurrent.duration.Duration
-
 object StormProducer {
   private[storm] val TIMEOUT = Message("timeout")
 }
@@ -37,7 +37,7 @@ object StormProducer {
  * this is runtime container for Storm spout
  */
 private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
-                                   taskContext: TaskContext, conf: UserConfig)
+    taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
   import io.gearpump.experiments.storm.producer.StormProducer._
 
@@ -75,15 +75,17 @@ private[storm] class StormProducer(gearpumpSpout: 
GearpumpSpout,
       optClock.foreach { clock =>
         gearpumpSpout.checkpoint(clock)
       }
-      getCheckpointClock
+      getCheckpointClock()
   }
 
-  def getCheckpointClock: Unit = {
+  def getCheckpointClock(): Unit = {
     
taskContext.scheduleOnce(Duration(StormConstants.CHECKPOINT_INTERVAL_MILLIS,
       TimeUnit.MILLISECONDS))(taskContext.appMaster ! GetCheckpointClock)
   }
 
   private def scheduleTimeout(timeout: Long): Unit = {
-    taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)){ self ! 
TIMEOUT }
+    taskContext.scheduleOnce(Duration(timeout, TimeUnit.MILLISECONDS)) {
+      self ! TIMEOUT
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
index c60a962..83532dc 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ package io.gearpump.experiments.storm.producer
 import java.util.{List => JList}
 
 import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
+
 import io.gearpump.TimeStamp
 import io.gearpump.experiments.storm.util.StormOutputCollector
 
@@ -30,7 +31,8 @@ case class PendingMessage(id: Object, messageTime: TimeStamp, 
startTime: TimeSta
  * this is used by Storm Spout to emit messages
  */
 private[storm] class StormSpoutOutputCollector(
-    collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean) 
extends ISpoutOutputCollector {
+    collector: StormOutputCollector, spout: ISpout, ackEnabled: Boolean)
+  extends ISpoutOutputCollector {
 
   private var checkpointClock = 0L
   private var pendingMessage: Option[PendingMessage] = None
@@ -48,21 +50,21 @@ private[storm] class StormSpoutOutputCollector(
     throw throwable
   }
 
-  override def emitDirect(taskId: Int, streamId: String, values: 
JList[AnyRef], messageId: Object): Unit = {
+  override def emitDirect(taskId: Int, streamId: String, values: 
JList[AnyRef], messageId: Object)
+    : Unit = {
     val curTime = System.currentTimeMillis()
     collector.setTimestamp(curTime)
     collector.emitDirect(taskId, streamId, values)
     setPendingOrAck(messageId, curTime, curTime)
   }
 
-
   def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
     this.checkpointClock = checkpointClock
     nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
       if (messageTime <= this.checkpointClock) {
         pendingMessage.foreach { case PendingMessage(id, _, _) =>
           spout.ack(id)
-          reset
+          reset()
         }
       }
     }
@@ -72,17 +74,18 @@ private[storm] class StormSpoutOutputCollector(
     pendingMessage.foreach { case PendingMessage(id, _, startTime) =>
       if (System.currentTimeMillis() - startTime >= timeoutMillis) {
         spout.fail(id)
-        reset
+        reset()
       }
     }
   }
 
-  private def reset: Unit = {
+  private def reset(): Unit = {
     pendingMessage = nextPendingMessage
     nextPendingMessage = None
   }
 
-  private def setPendingOrAck(messageId: Object, startTime: TimeStamp, 
messageTime: TimeStamp): Unit = {
+  private def setPendingOrAck(messageId: Object, startTime: TimeStamp, 
messageTime: TimeStamp)
+    : Unit = {
     if (ackEnabled) {
       val newPendingMessage = PendingMessage(messageId, messageTime, startTime)
       pendingMessage match {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index 32d655d..42dce5f 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,11 @@
 package io.gearpump.experiments.storm.topology
 
 import java.io.{File, FileOutputStream, IOException}
+import java.util
 import java.util.jar.JarFile
 import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, Future}
 
 import akka.actor.ActorRef
 import akka.pattern.ask
@@ -32,6 +35,9 @@ import backtype.storm.task.{GeneralTopologyContext, IBolt, 
OutputCollector, Topo
 import backtype.storm.tuple.{Fields, Tuple, TupleImpl}
 import backtype.storm.utils.Utils
 import clojure.lang.Atom
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.slf4j.Logger
+
 import io.gearpump.experiments.storm.processor.StormBoltOutputCollector
 import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
 import io.gearpump.experiments.storm.util.StormConstants._
@@ -41,12 +47,6 @@ import io.gearpump.streaming.DAG
 import io.gearpump.streaming.task._
 import io.gearpump.util.{Constants, LogUtil}
 import io.gearpump.{Message, TimeStamp}
-import org.apache.commons.io.{FileUtils, IOUtils}
-import org.slf4j.Logger
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.concurrent.{Await, Future}
 
 /**
  * subclass wraps Storm Spout and Bolt, and their lifecycles
@@ -68,7 +68,7 @@ trait GearpumpStormComponent {
   /**
    * invoked at Task.onStop
    */
-  def stop: Unit = {}
+  def stop(): Unit = {}
 }
 
 object GearpumpStormComponent {
@@ -76,9 +76,10 @@ object GearpumpStormComponent {
 
   object GearpumpSpout {
     def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
-             spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = {
+        spoutSpec: SpoutSpec, taskContext: TaskContext): GearpumpSpout = {
       val componentCommon = spoutSpec.get_common()
-      val normalizedConfig = normalizeConfig(config.toMap, componentCommon)
+      val scalaMap = config.asScala.toMap // Convert to scala immutable map
+      val normalizedConfig = normalizeConfig(scalaMap, componentCommon)
       val getTopologyContext = (dag: DAG, taskId: TaskId) => {
         val stormTaskId = gearpumpTaskIdToStorm(taskId)
         buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
@@ -122,7 +123,6 @@ object GearpumpStormComponent {
 
     private var collector: StormSpoutOutputCollector = null
 
-
     override def start(startTime: StartTime): Unit = {
       import taskContext.{appMaster, taskId}
 
@@ -161,8 +161,9 @@ object GearpumpStormComponent {
 
   object GearpumpBolt {
     def apply(topology: StormTopology, config: JMap[AnyRef, AnyRef],
-              boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = {
-      val normalizedConfig = normalizeConfig(config.toMap, 
boltSpec.get_common())
+        boltSpec: Bolt, taskContext: TaskContext): GearpumpBolt = {
+      val configAsScalaMap = config.asScala.toMap // Convert to scala 
immutable map
+      val normalizedConfig = normalizeConfig(configAsScalaMap, 
boltSpec.get_common())
       val getTopologyContext = (dag: DAG, taskId: TaskId) => {
         val stormTaskId = gearpumpTaskIdToStorm(taskId)
         buildTopologyContext(dag, topology, normalizedConfig, stormTaskId)
@@ -174,8 +175,10 @@ object GearpumpStormComponent {
         StormOutputCollector(taskContext, topologyContext)
       }
       val getTickTuple = (topologyContext: GeneralTopologyContext, freq: Int) 
=> {
-        new TupleImpl(topologyContext, 
List(freq.asInstanceOf[java.lang.Integer]),
-          SYSTEM_TASK_ID, SYSTEM_TICK_STREAM_ID, null)
+
+        val values = new util.ArrayList[Object] // To be compatible with Java 
interface
+        values.add(freq.asInstanceOf[java.lang.Integer])
+        new TupleImpl(topologyContext, values, SYSTEM_TASK_ID, 
SYSTEM_TICK_STREAM_ID, null)
       }
       GearpumpBolt(
         normalizedConfig,
@@ -218,7 +221,8 @@ object GearpumpStormComponent {
     override def next(message: Message): Unit = {
       val timestamp = message.timestamp
       collector.setTimestamp(timestamp)
-      
bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
 timestamp))
+      
bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
+        timestamp))
     }
 
     def getTickFrequency: Option[Int] = {
@@ -244,9 +248,9 @@ object GearpumpStormComponent {
    * @param componentCommon common component parts
    */
   private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef],
-                              componentCommon: ComponentCommon): JMap[AnyRef, 
AnyRef] = {
+      componentCommon: ComponentCommon): JMap[AnyRef, AnyRef] = {
     val config: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    config.putAll(stormConfig)
+    config.putAll(stormConfig.asJava)
     val componentConfig = parseJsonStringToMap(componentCommon.get_json_conf())
     Option(componentConfig.get(Config.TOPOLOGY_TRANSACTIONAL_ID))
       .foreach(config.put(Config.TOPOLOGY_TRANSACTIONAL_ID, _))
@@ -261,42 +265,63 @@ object GearpumpStormComponent {
     Await.result(dagFuture, timeout.duration)
   }
 
-  private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, 
stormConf: JMap[_, _]): GeneralTopologyContext = {
+  private def buildGeneralTopologyContext(dag: DAG, topology: StormTopology, 
stormConf: JMap[_, _])
+    : GeneralTopologyContext = {
     val taskToComponent = getTaskToComponent(dag)
-    val componentToSortedTasks: JMap[String, JList[Integer]] = 
getComponentToSortedTasks(taskToComponent)
-    val componentToStreamFields: JMap[String, JMap[String, Fields]] = 
getComponentToStreamFields(topology)
-    new GeneralTopologyContext(topology, stormConf, taskToComponent, 
componentToSortedTasks, componentToStreamFields, null)
+    val componentToSortedTasks: JMap[String, JList[Integer]] =
+      getComponentToSortedTasks(taskToComponent)
+    val componentToStreamFields: JMap[String, JMap[String, Fields]] =
+      getComponentToStreamFields(topology)
+    new GeneralTopologyContext(
+      topology, stormConf, taskToComponent.asJava, componentToSortedTasks,
+      componentToStreamFields, null)
   }
 
-  private def buildTopologyContext(dag: DAG, topology: StormTopology, 
stormConf: JMap[_, _], stormTaskId: Integer): TopologyContext = {
+  private def buildTopologyContext(
+      dag: DAG, topology: StormTopology, stormConf: JMap[_, _], stormTaskId: 
Integer)
+    : TopologyContext = {
     val taskToComponent = getTaskToComponent(dag)
-    val componentToSortedTasks: JMap[String, JList[Integer]] = 
getComponentToSortedTasks(taskToComponent)
-    val componentToStreamFields: JMap[String, JMap[String, Fields]] = 
getComponentToStreamFields(topology)
+    val componentToSortedTasks: JMap[String, JList[Integer]] =
+      getComponentToSortedTasks(taskToComponent)
+    val componentToStreamFields: JMap[String, JMap[String, Fields]] =
+      getComponentToStreamFields(topology)
     val codeDir = mkCodeDir
     val pidDir = mkPidDir
 
-    new TopologyContext(topology, stormConf, taskToComponent, 
componentToSortedTasks,
-      componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, 
null, null, new JHashMap[String, AnyRef],
-      new JHashMap[Integer, JMap[Integer, JMap[String, IMetric]]], new 
Atom(false))
+    new TopologyContext(topology, stormConf, taskToComponent.asJava, 
componentToSortedTasks,
+      componentToStreamFields, null, codeDir, pidDir, stormTaskId, null, null, 
null, null,
+      new JHashMap[String, AnyRef], new JHashMap[Integer, JMap[Integer, 
JMap[String, IMetric]]],
+      new Atom(false))
   }
 
-  private def getComponentToStreamFields(topology: StormTopology): 
JMap[String, JMap[String, Fields]] = {
-    val spouts = topology.get_spouts()
-    val bolts = topology.get_bolts()
+  private def getComponentToStreamFields(topology: StormTopology)
+    : JMap[String, JMap[String, Fields]] = {
+    val spouts = topology.get_spouts().asScala
+    val bolts = topology.get_bolts().asScala
 
-    (spouts.map { case (id, component) => id -> 
getComponentToFields(component.get_common()) } ++
-        bolts.map { case (id, component) => id -> 
getComponentToFields(component.get_common())} ++
-        Map(SYSTEM_COMPONENT_ID -> Map(SYSTEM_TICK_STREAM_ID -> new 
Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava)
-        ).toMap.asJava
+    val spoutFields = spouts.map {
+      case (id, component) => id -> 
getComponentToFields(component.get_common())
+    }
+
+    val boltFields = bolts.map {
+      case (id, component) => id -> 
getComponentToFields(component.get_common())
+    }
+
+    val systemFields = Map(SYSTEM_COMPONENT_ID ->
+      Map(SYSTEM_TICK_STREAM_ID -> new 
Fields(SYSTEM_COMPONENT_OUTPUT_FIELDS)).asJava)
+
+    (spoutFields ++ boltFields ++ systemFields).asJava
   }
 
   private def getComponentToFields(common: ComponentCommon): JMap[String, 
Fields] = {
-    common.get_streams.map { case (sid, stream) =>
+    val streams = common.get_streams.asScala
+    streams.map { case (sid, stream) =>
       sid -> new Fields(stream.get_output_fields())
-    }.toMap.asJava
+    }.asJava
   }
 
-  private def getComponentToSortedTasks(taskToComponent: Map[Integer, 
String]): JMap[String, JList[Integer]] = {
+  private def getComponentToSortedTasks(
+      taskToComponent: Map[Integer, String]): JMap[String, JList[Integer]] = {
     taskToComponent.groupBy(_._2).map { case (component, map) =>
       val sortedTasks = map.keys.toList.sorted.asJava
       component -> sortedTasks
@@ -307,12 +332,13 @@ object GearpumpStormComponent {
     val taskToComponent = dag.processors.flatMap { case (processorId, 
processorDescription) =>
       val parallelism = processorDescription.parallelism
       val component = 
processorDescription.taskConf.getString(STORM_COMPONENT).get
-      (0 until parallelism).map(index => 
gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component)
+      (0 until parallelism).map(index =>
+        gearpumpTaskIdToStorm(TaskId(processorId, index)) -> component)
     }
     taskToComponent
   }
 
-  // a workaround to support storm ShellBolt
+  // Workarounds to support storm ShellBolt
   private def mkPidDir: String = {
     val pidDir = FileUtils.getTempDirectoryPath + File.separator + "pid"
     try {
@@ -333,7 +359,7 @@ object GearpumpStormComponent {
       FileUtils.forceMkdir(new File(destDir))
 
       val jar = new JarFile(jarPath)
-      val enumEntries = jar.entries()
+      val enumEntries = jar.entries().asScala
       enumEntries.foreach { entry =>
         val file = new File(destDir + File.separator + entry.getName)
         if (!entry.isDirectory) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
index b88a200..5c3fc3e 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,6 +25,8 @@ import akka.actor.ActorSystem
 import backtype.storm.Config
 import backtype.storm.generated._
 import backtype.storm.utils.{ThriftTopologyUtils, Utils}
+import org.slf4j.Logger
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.experiments.storm.processor.StormProcessor
 import io.gearpump.experiments.storm.producer.StormProducer
@@ -34,9 +36,11 @@ import io.gearpump.experiments.storm.util.StormUtil._
 import io.gearpump.streaming.Processor
 import io.gearpump.streaming.task.Task
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
 
+// TODO: Refactor this file, we should disable using of JavaConversions
+// scalastyle:off javaconversions
 import scala.collection.JavaConversions._
+// scalastyle:on javaconversions
 
 object GearpumpStormTopology {
   private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
@@ -51,16 +55,14 @@ object GearpumpStormTopology {
       Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
       parseJsonStringToMap(appConfigInJson)
     )
-
   }
-
 }
 
 /**
  * this is a wrapper over Storm topology which
- *   1. merges Storm and Gearpump configs
- *   2. creates Gearpump processors
- *   3. provides interface for Gearpump applications to use Storm topology
+ * 1. merges Storm and Gearpump configs
+ * 2. creates Gearpump processors
+ * 3. provides interface for Gearpump applications to use Storm topology
  *
  * an implicit ActorSystem is required to create Gearpump processors
  * @param name topology name
@@ -78,14 +80,16 @@ private[storm] class GearpumpStormTopology(
   private val bolts = topology.get_bolts()
   private val stormConfig = mergeConfigs(sysConfig, appConfig, 
getComponentConfigs(spouts, bolts))
   private val spoutProcessors = spouts.map { case (id, spout) =>
-    id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap
+    id -> spoutToProcessor(id, spout, stormConfig.toMap)
+  }.toMap
   private val boltProcessors = bolts.map { case (id, bolt) =>
-    id -> boltToProcessor(id, bolt, stormConfig.toMap) }.toMap
+    id -> boltToProcessor(id, bolt, stormConfig.toMap)
+  }.toMap
   private val allProcessors = spoutProcessors ++ boltProcessors
 
   /**
    * @return merged Storm config with priority
-   *    defaults.yaml < custom file config < application config < component 
config
+   *         defaults.yaml < custom file config < application config < 
component config
    */
   def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
 
@@ -131,7 +135,7 @@ private[storm] class GearpumpStormTopology(
       stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): 
Processor[Task] = {
     val componentCommon = spoutSpec.get_common()
     val taskConf = UserConfig.empty
-        .withString(STORM_COMPONENT, spoutId)
+      .withString(STORM_COMPONENT, spoutId)
     val parallelism = getParallelism(stormConfig, componentCommon)
     Processor[StormProducer](parallelism, spoutId, taskConf)
   }
@@ -148,8 +152,8 @@ private[storm] class GearpumpStormTopology(
       stormConfig: Map[AnyRef, AnyRef])(implicit system: ActorSystem): 
Processor[Task] = {
     val componentCommon = boltSpec.get_common()
     val taskConf = UserConfig.empty
-        .withString(STORM_COMPONENT, boltId)
-        .withBoolean("state.checkpoint.enable", 
StormUtil.ackEnabled(stormConfig))
+      .withString(STORM_COMPONENT, boltId)
+      .withBoolean("state.checkpoint.enable", 
StormUtil.ackEnabled(stormConfig))
     val parallelism = getParallelism(stormConfig, componentCommon)
     Processor[StormProcessor](parallelism, boltId, taskConf)
   }
@@ -157,7 +161,8 @@ private[storm] class GearpumpStormTopology(
   /**
    * @return target components and streams
    */
-  private def getTargets(componentId: String, topology: StormTopology): 
Map[String, Map[String, Grouping]] = {
+  private def getTargets(componentId: String, topology: StormTopology)
+    : Map[String, Map[String, Grouping]] = {
     val componentIds = ThriftTopologyUtils.getComponentIds(topology)
     componentIds.flatMap { otherComponentId =>
       getInputs(otherComponentId, topology).toList.map(otherComponentId -> _)
@@ -178,16 +183,18 @@ private[storm] class GearpumpStormTopology(
   /**
    * @return input stream and grouping for a Storm component
    */
-  private def getInputs(componentId: String, topology: StormTopology): 
JMap[GlobalStreamId, Grouping] = {
+  private def getInputs(componentId: String, topology: StormTopology)
+    : JMap[GlobalStreamId, Grouping] = {
     ThriftTopologyUtils.getComponentCommon(topology, componentId).get_inputs
   }
 
   /**
    * get Storm component parallelism according to the following rule,
-   *   1. use "topology.tasks" if defined; otherwise use parallelism_hint
-   *   2. parallelism should not be larger than 
"topology.max.task.parallelism" if defined
-   *   3. component config overrides system config
-   * @param stormConfig system configs without merging "topology.tasks" and 
"topology.max.task.parallelism" of component
+   * 1. use "topology.tasks" if defined; otherwise use parallelism_hint
+   * 2. parallelism should not be larger than "topology.max.task.parallelism" 
if defined
+   * 3. component config overrides system config
+   * @param stormConfig System configs without merging "topology.tasks" and
+   *                    "topology.max.task.parallelism" of component
    * @return number of task instances for a component
    */
   private def getParallelism(stormConfig: Map[AnyRef, AnyRef], component: 
ComponentCommon): Int = {
@@ -207,7 +214,7 @@ private[storm] class GearpumpStormTopology(
   }
 
   private def getComponentConfigs(spouts: JMap[String, SpoutSpec],
-                                  bolts: JMap[String, Bolt]): 
Iterable[JMap[AnyRef, AnyRef]] = {
+      bolts: JMap[String, Bolt]): Iterable[JMap[AnyRef, AnyRef]] = {
     spouts.map { case (id, spoutSpec) =>
       parseJsonStringToMap(spoutSpec.get_common().get_json_conf())
     } ++ bolts.map { case (id, boltSpec) =>
@@ -222,7 +229,7 @@ private[storm] class GearpumpStormTopology(
    * @return the two configs merged from all the component configs and 
existing configs
    */
   private def getMergedComponentConfig(componentConfigs: Iterable[JMap[AnyRef, 
AnyRef]],
-                                       allConfig: Map[AnyRef, AnyRef]): 
JMap[AnyRef, AnyRef] = {
+      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
     val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
     mergedConfig.putAll(getMergedKryoDecorators(componentConfigs, allConfig))
     mergedConfig.putAll(getMergedKryoRegister(componentConfigs, allConfig))
@@ -232,10 +239,11 @@ private[storm] class GearpumpStormTopology(
   /**
    * @param componentConfigs list of component configs
    * @param allConfig existing configs without merging component configs
-   * @return a merged config with a list of distinct kryo decorators from 
component and existing configs
+   * @return a merged config with a list of distinct kryo decorators from 
component and
+   *         existing configs
    */
   private def getMergedKryoDecorators(componentConfigs: Iterable[JMap[AnyRef, 
AnyRef]],
-                                allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, 
AnyRef] = {
+      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
     val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
     val key = Config.TOPOLOGY_KRYO_DECORATORS
     val configs = getConfigValues(componentConfigs, allConfig, key)
@@ -250,7 +258,6 @@ private[storm] class GearpumpStormTopology(
         accum
       case illegal =>
         throw new IllegalArgumentException(s"$key must be a List of Strings; 
actually $illegal")
-
     }
     if (distincts.nonEmpty) {
       val decorators: JList[String] = new JArrayList(distincts.size)
@@ -266,7 +273,7 @@ private[storm] class GearpumpStormTopology(
    * @return a merged config with component config overriding existing configs
    */
   private def getMergedKryoRegister(componentConfigs: Iterable[JMap[AnyRef, 
AnyRef]],
-                              allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, 
AnyRef] = {
+      allConfig: Map[AnyRef, AnyRef]): JMap[AnyRef, AnyRef] = {
     val mergedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef](1)
     val key = Config.TOPOLOGY_KRYO_REGISTER
     val configs = getConfigValues(componentConfigs, allConfig, key)
@@ -275,7 +282,7 @@ private[storm] class GearpumpStormTopology(
         accum ++ config.map {
           case m: JMap[_, _] =>
             m.map {
-              case (k: String, v: String) => k ->v
+              case (k: String, v: String) => k -> v
               case illegal =>
                 throw new IllegalArgumentException(
                   s"each element of $key must be a String or a Map of Strings; 
actually $illegal")
@@ -283,7 +290,8 @@ private[storm] class GearpumpStormTopology(
           case s: String =>
             Map(s -> null)
           case illegal =>
-            throw new IllegalArgumentException(s"each element of $key must be 
a String or a Map of Strings; actually $illegal")
+            throw new IllegalArgumentException(s"each element of $key must be 
a String or " +
+              s"a Map of Strings; actually $illegal")
         }.reduce(_ ++ _)
       case (accum, null) =>
         accum
@@ -294,7 +302,7 @@ private[storm] class GearpumpStormTopology(
     if (merged.nonEmpty) {
       val registers: JMap[String, String] = new JHashMap[String, 
String](merged.size)
       registers.putAll(merged)
-     mergedConfig.put(key, registers)
+      mergedConfig.put(key, registers)
     }
     mergedConfig
   }
@@ -306,7 +314,7 @@ private[storm] class GearpumpStormTopology(
    * @return a list of values for a config from both component configs and 
existing configs
    */
   private def getConfigValues(componentConfigs: Iterable[JMap[AnyRef, AnyRef]],
-                              allConfig: Map[AnyRef, AnyRef], key: String): 
Iterable[AnyRef] = {
+      allConfig: Map[AnyRef, AnyRef], key: String): Iterable[AnyRef] = {
     componentConfigs.map(config => config.get(key)) ++ 
allConfig.get(key).toList
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
index 7662f36..ee54add 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,8 +22,8 @@ import java.util.{List => JList}
 
 import backtype.storm.task.GeneralTopologyContext
 import backtype.storm.tuple.{Tuple, TupleImpl}
-import io.gearpump.TimeStamp
 
+import io.gearpump.TimeStamp
 
 /**
  * this carries Storm tuple values in the Gearpump world
@@ -36,9 +36,9 @@ private[storm] class GearpumpTuple(
     val sourceStreamId: String,
     @transient val targetPartitions: Map[String, Array[Int]]) extends 
Serializable {
   /**
-   * creates a Storm [[Tuple]] to be passed to a Storm component
+   * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm 
component
    * this is needed for each incoming message
-   * because we cannot get [[GeneralTopologyContext]] at deserialization
+   * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at 
deserialization
    * @param topologyContext topology context used for all tasks
    * @return a Tuple
    */
@@ -46,15 +46,14 @@ private[storm] class GearpumpTuple(
     TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, 
timestamp)
   }
 
-
   def canEqual(other: Any): Boolean = other.isInstanceOf[GearpumpTuple]
 
   override def equals(other: Any): Boolean = other match {
     case that: GearpumpTuple =>
       (that canEqual this) &&
-          values == that.values &&
-          sourceTaskId == that.sourceTaskId &&
-          sourceStreamId == that.sourceStreamId
+        values == that.values &&
+        sourceTaskId == that.sourceTaskId &&
+        sourceStreamId == that.sourceStreamId
     case _ => false
   }
 
@@ -68,4 +67,3 @@ case class TimedTuple(topologyContext: 
GeneralTopologyContext, tuple: JList[AnyR
     sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
   extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
index e9f03f3..041a90c 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/GraphBuilder.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@
 
 package io.gearpump.experiments.storm.util
 
-
 import io.gearpump.experiments.storm.partitioner.StormPartitioner
 import io.gearpump.experiments.storm.topology.GearpumpStormTopology
 import io.gearpump.partitioner.Partitioner

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
index d727b7a..91277cd 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/Grouper.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,15 @@
 package io.gearpump.experiments.storm.util
 
 import java.util.{List => JList}
+import scala.util.Random
 
 import backtype.storm.generated.GlobalStreamId
 import backtype.storm.grouping.CustomStreamGrouping
 import backtype.storm.task.TopologyContext
 import backtype.storm.tuple.Fields
 
-import scala.collection.JavaConversions._
-import scala.util.Random
-
 /**
- * Grouper is identical to that in storm but return gearpump
- * partitions for storm tuple values
+ * Grouper is identical to that in storm but return gearpump partitions for 
storm tuple values
  */
 sealed trait Grouper {
   /**
@@ -50,6 +47,7 @@ class GlobalGrouper extends Grouper {
 
 /**
  * NoneGrouper randomly returns partition
+ *
  * @param numTasks number of target tasks
  */
 class NoneGrouper(numTasks: Int) extends Grouper {
@@ -62,8 +60,8 @@ class NoneGrouper(numTasks: Int) extends Grouper {
 }
 
 /**
- * ShuffleGrouper shuffles partitions and returns them sequentially,
- * and then shuffles again
+ * ShuffleGrouper shuffles partitions and returns them sequentially, and then 
shuffles again
+ *
  * @param numTasks number of target tasks
  */
 class ShuffleGrouper(numTasks: Int) extends Grouper {
@@ -86,6 +84,7 @@ class ShuffleGrouper(numTasks: Int) extends Grouper {
 
 /**
  * FieldsGrouper returns partition based on value of groupFields
+ *
  * @param outFields declared output fields of source task
  * @param groupFields grouping fields of target tasks
  * @param numTasks number of target tasks
@@ -101,6 +100,7 @@ class FieldsGrouper(outFields: Fields, groupFields: Fields, 
numTasks: Int) exten
 
 /**
  * AllGrouper returns all partitions
+ *
  * @param numTasks number of target tasks
  */
 class AllGrouper(numTasks: Int) extends Grouper {
@@ -113,16 +113,30 @@ class AllGrouper(numTasks: Int) extends Grouper {
 
 /**
  * CustomGrouper allows users to specify grouping strategy
- * @param grouping see [[CustomStreamGrouping]]
+ *
+ * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]]
  */
 class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper {
 
-  def prepare(topologyContext: TopologyContext, globalStreamId: 
GlobalStreamId, targetTasks: JList[Integer]): Unit = {
+  def prepare(
+      topologyContext: TopologyContext, globalStreamId: GlobalStreamId, 
targetTasks: JList[Integer])
+    : Unit = {
     grouping.prepare(topologyContext, globalStreamId, targetTasks)
   }
 
   override def getPartitions(taskId: Int, values: JList[AnyRef]): Array[Int] = 
{
-    grouping.chooseTasks(taskId, 
values).map(StormUtil.stormTaskIdToGearpump(_).index).toArray
+    val tasks = grouping.chooseTasks(taskId, values)
+    val result = new Array[Int](tasks.size())
+
+    val iter = tasks.iterator()
+
+    var index = 0
+    while (iter.hasNext()) {
+      val value = iter.next()
+      result(index) = StormUtil.stormTaskIdToGearpump(value).index
+      index += 1
+    }
+    result
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
index 58e7160..928d07b 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormConstants.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,7 @@ object StormConstants {
   val SYSTEM_TASK_ID: Integer = -1
   val SYSTEM_TICK_STREAM_ID = "__tick"
 
-  val CHECKPOINT_INTERVAL_MILLIS = 2000  // 2 seconds
+  val CHECKPOINT_INTERVAL_MILLIS = 2000 // 2 seconds
 
   val STORM_SERIALIZATION_FRAMEWORK = "gearpump.storm.serialization-framework"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
index c1df63b..74d1b2b 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,21 +19,21 @@
 package io.gearpump.experiments.storm.util
 
 import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => 
JList, Map => JMap}
+import scala.collection.JavaConverters._
 
 import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject}
 import backtype.storm.grouping.CustomStreamGrouping
 import backtype.storm.task.TopologyContext
 import backtype.storm.tuple.Fields
 import backtype.storm.utils.Utils
+import org.slf4j.Logger
+
 import io.gearpump._
 import io.gearpump.experiments.storm.topology.GearpumpTuple
 import io.gearpump.experiments.storm.util.StormUtil._
 import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.task.{TaskContext, TaskId}
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.JavaConversions._
 
 object StormOutputCollector {
   private val LOG: Logger = LogUtil.getLogger(classOf[StormOutputCollector])
@@ -43,18 +43,20 @@ object StormOutputCollector {
     val stormTaskId = topologyContext.getThisTaskId
     val componentId = topologyContext.getThisComponentId
     val taskToComponent = topologyContext.getTaskToComponent
-    val componentToProcessorId = 
getComponentToProcessorId(taskToComponent.toMap)
+    val componentToProcessorId = 
getComponentToProcessorId(taskToComponent.asScala.toMap)
     val targets = topologyContext.getTargets(componentId)
     val streamGroupers: Map[String, Grouper] =
-      targets.flatMap { case (streamId, targetGrouping) =>
-        targetGrouping.collect { case (target, grouping) if 
!grouping.is_set_direct() =>
+      targets.asScala.flatMap { case (streamId, targetGrouping) =>
+        targetGrouping.asScala.collect { case (target, grouping) if 
!grouping.is_set_direct() =>
           streamId -> getGrouper(topologyContext, grouping, componentId, 
streamId, target)
         }
       }.toMap
     val getTargetPartitionsFn = (streamId: String, values: JList[AnyRef]) => {
-      getTargetPartitions(stormTaskId, streamId, targets, streamGroupers, 
componentToProcessorId, values)
+      getTargetPartitions(stormTaskId, streamId, targets,
+        streamGroupers, componentToProcessorId, values)
     }
-    new StormOutputCollector(stormTaskId, taskToComponent, targets, 
getTargetPartitionsFn, taskContext, LatestTime)
+    new StormOutputCollector(stormTaskId, taskToComponent, targets, 
getTargetPartitionsFn,
+      taskContext, LatestTime)
   }
 
   /**
@@ -66,7 +68,7 @@ object StormOutputCollector {
       targets: JMap[String, JMap[String, Grouping]],
       streamGroupers: Map[String, Grouper],
       componentToProcessorId: Map[String, ProcessorId],
-      values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) ={
+      values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) = {
     val ret: JList[Integer] = new JArrayList[Integer](targets.size)
 
     @annotation.tailrec
@@ -85,18 +87,20 @@ object StormOutputCollector {
         accum
       }
     }
-    val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, 
Map.empty[String, Array[Int]])
+    val targetPartitions = getRecur(targets.get(streamId).keySet().iterator,
+      Map.empty[String, Array[Int]])
     (targetPartitions, ret)
   }
 
-  private def getComponentToProcessorId(taskToComponent: Map[Integer, 
String]): Map[String, ProcessorId] = {
+  private def getComponentToProcessorId(taskToComponent: Map[Integer, String])
+    : Map[String, ProcessorId] = {
     taskToComponent.map { case (id, component) =>
       component -> stormTaskIdToGearpump(id).processorId
     }
   }
 
   private def getGrouper(topologyContext: TopologyContext, grouping: Grouping,
-                         source: String, streamId: String, target: String): 
Grouper = {
+      source: String, streamId: String, target: String): Grouper = {
     val outFields = topologyContext.getComponentOutputFields(source, streamId)
     val targetTasks = topologyContext.getComponentTasks(target)
     val targetTaskNum = targetTasks.size
@@ -116,7 +120,8 @@ object StormOutputCollector {
       case Grouping._Fields.ALL =>
         new AllGrouper(targetTaskNum)
       case Grouping._Fields.CUSTOM_SERIALIZED =>
-        val customGrouping = 
Utils.javaDeserialize(grouping.get_custom_serialized, 
classOf[Serializable]).asInstanceOf[CustomStreamGrouping]
+        val customGrouping = 
Utils.javaDeserialize(grouping.get_custom_serialized,
+          classOf[Serializable]).asInstanceOf[CustomStreamGrouping]
         val grouper = new CustomGrouper(customGrouping)
         grouper.prepare(topologyContext, globalStreamId, targetTasks)
         grouper
@@ -141,7 +146,7 @@ object StormOutputCollector {
 
   private def instantiateJavaObject(javaObject: JavaObject): 
CustomStreamGrouping = {
     val className = javaObject.get_full_class_name()
-    val args = javaObject.get_args_list().map(_.getFieldValue)
+    val args = javaObject.get_args_list().asScala.map(_.getFieldValue)
     val customGrouping = 
Class.forName(className).getConstructor(args.map(_.getClass): _*)
       .newInstance(args).asInstanceOf[CustomStreamGrouping]
     customGrouping
@@ -149,7 +154,8 @@ object StormOutputCollector {
 }
 
 /**
- * this provides common functionality for 
[[io.gearpump.experiments.storm.producer.StormSpoutOutputCollector]]
+ * Provides common functionality for
+ * [[io.gearpump.experiments.storm.producer.StormSpoutOutputCollector]]
  * and [[io.gearpump.experiments.storm.processor.StormBoltOutputCollector]]
  */
 class StormOutputCollector(
@@ -162,14 +168,15 @@ class StormOutputCollector(
   import io.gearpump.experiments.storm.util.StormOutputCollector._
 
   /**
-   * this is invoked by a Storm output collector to emit tuple values into a 
stream.
-   * We will wrap the values into a message of [[GearpumpTuple]] along with 
the target partitions
-   * to tell [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] 
where to send the message.
-   * We also return the corresponding target Storm task ids back to the 
collector
+   * Emits tuple values into a stream (invoked by a Storm output collector).
+   *
+   * wrapS the values into a message of [[GearpumpTuple]] along with the 
target partitions
+   * to tell [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] 
where to send the
+   * message. We also return the corresponding target Storm task ids back to 
the collector
    *
    * @param streamId Storm stream id
    * @param values Storm tuple values
-   * @return target Storm task ids
+   * @return Target Storm task ids
    */
   def emit(streamId: String, values: JList[AnyRef]): JList[Integer] = {
     if (targets.containsKey(streamId)) {
@@ -183,10 +190,11 @@ class StormOutputCollector(
   }
 
   /**
-   * this is invoked by Storm output collector to emit tuple values to a 
specific Storm task.
-   * we translate the Storm task id into Gearpump TaskId and tell
+   * Emit tuple values to a specific Storm task (invoked by Storm output 
collector).
+   *
+   * We translate the Storm task id into Gearpump TaskId and tell
    * [[io.gearpump.experiments.storm.partitioner.StormPartitioner]] through 
the targetPartitions
-   * field of [[GearpumpTuple]]
+   * field of [[io.gearpump.experiments.storm.topology.GearpumpTuple]]
    *
    * @param id Storm task id
    * @param streamId Storm stream id
@@ -203,8 +211,7 @@ class StormOutputCollector(
   }
 
   /**
-   * get timestamp from each incoming Message and
-   * which will be set into output messages
+   * set timestamp from each incoming Message if not attached.
    */
   def setTimestamp(timestamp: TimeStamp): Unit = {
     this.timestamp = timestamp

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
index f306de6..f1c7bab 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormSerializationFramework.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,11 +19,13 @@ package io.gearpump.experiments.storm.util
 
 import java.lang.{Integer => JInteger}
 import java.util.{Map => JMap}
+
 import akka.actor.ExtendedActorSystem
 import backtype.storm.serialization.SerializationFactory
 import backtype.storm.utils.ListDelegate
 import com.esotericsoftware.kryo.Kryo
 import com.esotericsoftware.kryo.io.{Input, Output}
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.experiments.storm.topology.GearpumpTuple
 import io.gearpump.experiments.storm.util.StormConstants._
@@ -50,8 +52,9 @@ class StormSerializationFramework extends 
SerializationFramework {
 }
 
 /**
- * serializes / deserializes [[GearpumpTuple]]
- * @param kryo created by Storm [[SerializationFactory]]
+ * serializes / deserializes 
[[io.gearpump.experiments.storm.topology.GearpumpTuple]]
+ *
+ * @param kryo created by Storm 
[[backtype.storm.serialization.SerializationFactory]]
  */
 class StormSerializer(kryo: Kryo) extends Serializer {
   // -1 means the max buffer size is 2147483647

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
index 4f4fced..554210c 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,21 +24,22 @@ import java.util.{HashMap => JHashMap, Map => JMap}
 import akka.actor.ActorSystem
 import backtype.storm.Config
 import backtype.storm.generated._
+import org.apache.storm.shade.org.json.simple.JSONValue
+
 import io.gearpump.cluster.UserConfig
 import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, 
GearpumpSpout}
 import io.gearpump.experiments.storm.topology._
 import io.gearpump.experiments.storm.util.StormConstants._
 import io.gearpump.streaming.task.{TaskContext, TaskId}
 import io.gearpump.util.Util
-import org.apache.storm.shade.org.json.simple.JSONValue
 
 object StormUtil {
 
-
   /**
-   * convert storm task id to gearpump [[TaskId]]
-   * the high 16 bit of an Int is TaskId.processorId
-   * the low 16 bit of an Int is TaskId.index
+   * Convert storm task id to gearpump [[io.gearpump.streaming.task.TaskId]]
+   *
+   * The high 16 bit of an Int is TaskId.processorId
+   * The low 16 bit of an Int is TaskId.index
    */
   def stormTaskIdToGearpump(id: Integer): TaskId = {
     val index = id & 0xFFFF
@@ -57,11 +58,12 @@ object StormUtil {
     (processorId << 16) + (index & 0xFFFF)
   }
 
-
   /**
    * @return a configured [[GearpumpStormComponent]]
    */
-  def getGearpumpStormComponent(taskContext: TaskContext, conf: 
UserConfig)(implicit system: ActorSystem): GearpumpStormComponent = {
+  def getGearpumpStormComponent(
+      taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem)
+    : GearpumpStormComponent = {
     val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get
     val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get
     val componentId = conf.getString(STORM_COMPONENT).get
@@ -77,8 +79,8 @@ object StormUtil {
   }
 
   /**
-   * parse config in json to map
-   * return empty map for invalid json string
+   * Parses config in json to map, returns empty map for invalid json string
+   *
    * @param json config in json
    * @return config in map
    */
@@ -95,7 +97,8 @@ object StormUtil {
   def getInt(conf: JMap[_, _], name: String): Option[Int] = {
     Option(conf.get(name)).map {
       case number: Number => number.intValue
-      case invalid => throw new IllegalArgumentException(s"$name must be Java 
Integer; actual: ${invalid.getClass}")
+      case invalid => throw new IllegalArgumentException(
+        s"$name must be Java Integer; actual: ${invalid.getClass}")
     }
   }
 
@@ -105,7 +108,8 @@ object StormUtil {
   def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = {
     Option(conf.get(name)).map {
       case b: JBoolean => b.booleanValue()
-      case invalid => throw new IllegalArgumentException(s"$name must be a 
Java Boolean; acutal: ${invalid.getClass}")
+      case invalid => throw new IllegalArgumentException(
+        s"$name must be a Java Boolean; acutal: ${invalid.getClass}")
     }
   }
 
@@ -126,7 +130,7 @@ object StormUtil {
   }
 
   def getThriftPort(): Int = {
-    Util.findFreePort.getOrElse(
+    Util.findFreePort().getOrElse(
       throw new Exception("unable to find free port for thrift server"))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
index 200807c..ed4a6bb 100644
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +19,15 @@
 package io.gearpump.experiments.storm.partitioner
 
 import java.util.{List => JList}
-import io.gearpump.Message
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.partitioner.Partitioner
+import scala.collection.JavaConverters._
+
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-import scala.collection.JavaConverters._
+
+import io.gearpump.Message
+import io.gearpump.experiments.storm.topology.GearpumpTuple
+import io.gearpump.partitioner.Partitioner
 
 class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
 
@@ -38,30 +40,28 @@ class StormPartitionerSpec extends PropSpec with 
PropertyChecks with Matchers {
       sourceTaskId <- idGen
       sourceStreamId <- Gen.alphaStr
     } yield (targetPartitions: Map[String, Array[Int]]) => {
-        new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, 
targetPartitions)
-      }
+      new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, 
targetPartitions)
+    }
 
     forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) {
-      (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, 
components: List[String], partitions: Array[Int]) =>
+      (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int,
+        components: List[String], partitions: Array[Int]) => {
         val currentPartitionId = id
         val targetPartitions = components.init.map(c => (c, partitions)).toMap
         val tuple = tupleFactory(targetPartitions)
-        targetPartitions.foreach { case (target, ps) =>
-          val partitioner = new StormPartitioner(target)
-          partitioner.getPartitions(Message(tuple), ps.last + 1,
-            currentPartitionId) shouldBe ps
+        targetPartitions.foreach {
+          case (target, ps) => {
+            val partitioner = new StormPartitioner(target)
+            ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, 
currentPartitionId)
+          }
         }
         val partitionNum = id
         val nonTarget = components.last
         val partitioner = new StormPartitioner(nonTarget)
-        if (targetPartitions.contains(nonTarget)) {
-          println(targetPartitions)
-        }
+
         partitioner.getPartitions(Message(tuple), partitionNum,
           currentPartitionId) shouldBe List(Partitioner.UNKNOWN_PARTITION_ID)
-
+      }
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
index d27a754..64cabd5 100644
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,20 @@
 package io.gearpump.experiments.storm.processor
 
 import java.util.{List => JList}
+import scala.collection.JavaConverters._
 
 import backtype.storm.tuple.Tuple
 import backtype.storm.utils.Utils
-import io.gearpump.experiments.storm.util.StormOutputCollector
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.JavaConversions._
+import io.gearpump.experiments.storm.util.StormOutputCollector
 
-class StormBoltOutputCollectorSpec extends PropSpec with PropertyChecks with 
Matchers with MockitoSugar {
+class StormBoltOutputCollectorSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   property("StormBoltOutputCollector should call StormOutputCollector") {
     val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 
1000))
@@ -41,8 +42,8 @@ class StormBoltOutputCollectorSpec extends PropSpec with 
PropertyChecks with Mat
       val collector = mock[StormOutputCollector]
       val boltCollector = new StormBoltOutputCollector(collector)
       val streamId = Utils.DEFAULT_STREAM_ID
-      boltCollector.emit(streamId, null, values)
-      verify(collector).emit(streamId, values)
+      boltCollector.emit(streamId, null, values.asJava)
+      verify(collector).emit(streamId, values.asJava)
     }
   }
 
@@ -50,6 +51,6 @@ class StormBoltOutputCollectorSpec extends PropSpec with 
PropertyChecks with Mat
     val collector = mock[StormOutputCollector]
     val tuple = mock[Tuple]
     val boltCollector = new StormBoltOutputCollector(collector)
-    an [Exception] should be thrownBy boltCollector.fail(tuple)
+    an[Exception] should be thrownBy boltCollector.fail(tuple)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
index 6a74b24..a3a8196 100644
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.experiments.storm.processor
 
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, 
GearpumpSpout}
+import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
 import io.gearpump.streaming.MockUtil
 import io.gearpump.streaming.task.StartTime
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{WordSpec, Matchers}
 
 class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
 
@@ -61,6 +62,5 @@ class StormProcessorSpec extends WordSpec with Matchers with 
MockitoSugar {
       verify(gearpumpBolt).tick(freq)
     }
   }
-
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
index 98ca2d5..8c10afc 100644
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,14 +19,15 @@
 package io.gearpump.experiments.storm.producer
 
 import akka.testkit.TestProbe
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
 import io.gearpump.streaming.MockUtil
 import io.gearpump.streaming.task.StartTime
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{WordSpec, Matchers}
-import org.mockito.Mockito._
 
 class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
index d202446..e638da9 100644
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,20 @@
 package io.gearpump.experiments.storm.producer
 
 import java.util.{List => JList}
+import scala.collection.JavaConverters._
 
 import backtype.storm.spout.ISpout
 import backtype.storm.utils.Utils
-import io.gearpump.experiments.storm.util.StormOutputCollector
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.JavaConversions._
+import io.gearpump.experiments.storm.util.StormOutputCollector
 
-class StormSpoutOutputCollectorSpec extends PropSpec with PropertyChecks with 
Matchers with MockitoSugar {
+class StormSpoutOutputCollectorSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
   property("StormSpoutOutputCollector should call StormOutputCollector") {
     val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 
1000))
@@ -42,9 +43,8 @@ class StormSpoutOutputCollectorSpec extends PropSpec with 
PropertyChecks with Ma
       val spout = mock[ISpout]
       val streamId = Utils.DEFAULT_STREAM_ID
       val spoutCollector = new StormSpoutOutputCollector(collector, spout, 
false)
-      spoutCollector.emit(streamId, values, null)
-      verify(collector).emit(streamId, values)
+      spoutCollector.emit(streamId, values.asJava, null)
+      verify(collector).emit(streamId, values.asJava)
     }
   }
-
 }

Reply via email to