http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
new file mode 100644
index 0000000..f95b840
--- /dev/null
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => 
JList, Map => JMap}
+import scala.collection.JavaConverters._
+
+import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject}
+import backtype.storm.grouping.CustomStreamGrouping
+import backtype.storm.task.TopologyContext
+import backtype.storm.tuple.Fields
+import backtype.storm.utils.Utils
+import org.slf4j.Logger
+
+import org.apache.gearpump._
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.experiments.storm.util.StormUtil._
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
+import org.apache.gearpump.util.LogUtil
+
+object StormOutputCollector {
+  private val LOG: Logger = LogUtil.getLogger(classOf[StormOutputCollector])
+  private[storm] val EMPTY_LIST: JList[Integer] = new JArrayList[Integer](0)
+
+  def apply(taskContext: TaskContext, topologyContext: TopologyContext): 
StormOutputCollector = {
+    val stormTaskId = topologyContext.getThisTaskId
+    val componentId = topologyContext.getThisComponentId
+    val taskToComponent = topologyContext.getTaskToComponent
+    val componentToProcessorId = 
getComponentToProcessorId(taskToComponent.asScala.toMap)
+    val targets = topologyContext.getTargets(componentId)
+    val streamGroupers: Map[String, Grouper] =
+      targets.asScala.flatMap { case (streamId, targetGrouping) =>
+        targetGrouping.asScala.collect { case (target, grouping) if 
!grouping.is_set_direct() =>
+          streamId -> getGrouper(topologyContext, grouping, componentId, 
streamId, target)
+        }
+      }.toMap
+    val getTargetPartitionsFn = (streamId: String, values: JList[AnyRef]) => {
+      getTargetPartitions(stormTaskId, streamId, targets,
+        streamGroupers, componentToProcessorId, values)
+    }
+    new StormOutputCollector(stormTaskId, taskToComponent, targets, 
getTargetPartitionsFn,
+      taskContext, LatestTime)
+  }
+
+  /**
+   * get target Gearpump partitions and Storm task ids
+   */
+  private def getTargetPartitions(
+      stormTaskId: Int,
+      streamId: String,
+      targets: JMap[String, JMap[String, Grouping]],
+      streamGroupers: Map[String, Grouper],
+      componentToProcessorId: Map[String, ProcessorId],
+      values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) = {
+    val ret: JList[Integer] = new JArrayList[Integer](targets.size)
+
+    @annotation.tailrec
+    def getRecur(iter: JIterator[String],
+        accum: Map[String, Array[Int]]): Map[String, Array[Int]] = {
+      if (iter.hasNext) {
+        val target = iter.next
+        val grouper = streamGroupers(streamId)
+        val partitions = grouper.getPartitions(stormTaskId, values)
+        partitions.foreach { p =>
+          val stormTaskId = 
gearpumpTaskIdToStorm(TaskId(componentToProcessorId(target), p))
+          ret.add(stormTaskId)
+        }
+        getRecur(iter, accum + (target -> partitions))
+      } else {
+        accum
+      }
+    }
+    val targetPartitions = getRecur(targets.get(streamId).keySet().iterator,
+      Map.empty[String, Array[Int]])
+    (targetPartitions, ret)
+  }
+
+  private def getComponentToProcessorId(taskToComponent: Map[Integer, String])
+    : Map[String, ProcessorId] = {
+    taskToComponent.map { case (id, component) =>
+      component -> stormTaskIdToGearpump(id).processorId
+    }
+  }
+
+  private def getGrouper(topologyContext: TopologyContext, grouping: Grouping,
+      source: String, streamId: String, target: String): Grouper = {
+    val outFields = topologyContext.getComponentOutputFields(source, streamId)
+    val targetTasks = topologyContext.getComponentTasks(target)
+    val targetTaskNum = targetTasks.size
+    val globalStreamId = new GlobalStreamId(source, streamId)
+
+    grouping.getSetField match {
+      case Grouping._Fields.FIELDS =>
+        if (isGlobalGrouping(grouping)) {
+          new GlobalGrouper
+        } else {
+          new FieldsGrouper(outFields, new Fields(grouping.get_fields()), 
targetTaskNum)
+        }
+      case Grouping._Fields.SHUFFLE =>
+        new ShuffleGrouper(targetTaskNum)
+      case Grouping._Fields.NONE =>
+        new NoneGrouper(targetTaskNum)
+      case Grouping._Fields.ALL =>
+        new AllGrouper(targetTaskNum)
+      case Grouping._Fields.CUSTOM_SERIALIZED =>
+        val customGrouping = 
Utils.javaDeserialize(grouping.get_custom_serialized,
+          classOf[Serializable]).asInstanceOf[CustomStreamGrouping]
+        val grouper = new CustomGrouper(customGrouping)
+        grouper.prepare(topologyContext, globalStreamId, targetTasks)
+        grouper
+      case Grouping._Fields.CUSTOM_OBJECT =>
+        val customObject = grouping.get_custom_object()
+        val customGrouping = instantiateJavaObject(customObject)
+        val grouper = new CustomGrouper(customGrouping)
+        grouper.prepare(topologyContext, globalStreamId, targetTasks)
+        grouper
+      case Grouping._Fields.LOCAL_OR_SHUFFLE =>
+        // Gearpump has built-in support for sending messages to local actor
+        new ShuffleGrouper(targetTaskNum)
+      case Grouping._Fields.DIRECT =>
+        throw new Exception("direct grouping should not be called here")
+    }
+  }
+
+  private def isGlobalGrouping(grouping: Grouping): Boolean = {
+    grouping.getSetField == Grouping._Fields.FIELDS &&
+      grouping.get_fields.isEmpty
+  }
+
+  private def instantiateJavaObject(javaObject: JavaObject): 
CustomStreamGrouping = {
+    val className = javaObject.get_full_class_name()
+    val args = javaObject.get_args_list().asScala.map(_.getFieldValue)
+    val customGrouping = 
Class.forName(className).getConstructor(args.map(_.getClass): _*)
+      .newInstance(args).asInstanceOf[CustomStreamGrouping]
+    customGrouping
+  }
+}
+
+/**
+ * Provides common functionality for
+ * [[org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector]]
+ * and 
[[org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector]]
+ */
+class StormOutputCollector(
+    stormTaskId: Int,
+    taskToComponent: JMap[Integer, String],
+    targets: JMap[String, JMap[String, Grouping]],
+    getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, 
Array[Int]], JList[Integer]),
+    val taskContext: TaskContext,
+    private var timestamp: TimeStamp) {
+  import org.apache.gearpump.experiments.storm.util.StormOutputCollector._
+
+  /**
+   * Emits tuple values into a stream (invoked by a Storm output collector).
+   *
+   * wrapS the values into a message of [[GearpumpTuple]] along with the 
target partitions
+   * to tell 
[[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] where to 
send
+   * the message. We also return the corresponding target Storm task ids back 
to the collector
+   *
+   * @param streamId Storm stream id
+   * @param values Storm tuple values
+   * @return Target Storm task ids
+   */
+  def emit(streamId: String, values: JList[AnyRef]): JList[Integer] = {
+    if (targets.containsKey(streamId)) {
+      val (targetPartitions, targetStormTaskIds) = 
getTargetPartitionsFn(streamId, values)
+      val tuple = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
+      taskContext.output(Message(tuple, timestamp))
+      targetStormTaskIds
+    } else {
+      EMPTY_LIST
+    }
+  }
+
+  /**
+   * Emit tuple values to a specific Storm task (invoked by Storm output 
collector).
+   *
+   * We translate the Storm task id into Gearpump TaskId and tell
+   * [[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] 
through the
+   * targetPartitions field of 
[[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]]
+   *
+   * @param id Storm task id
+   * @param streamId Storm stream id
+   * @param values Storm tuple values
+   */
+  def emitDirect(id: Int, streamId: String, values: JList[AnyRef]): Unit = {
+    if (targets.containsKey(streamId)) {
+      val target = taskToComponent.get(id)
+      val partition = stormTaskIdToGearpump(id).index
+      val targetPartitions = Map(target -> Array(partition))
+      val tuple = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
+      taskContext.output(Message(tuple, timestamp))
+    }
+  }
+
+  /**
+   * set timestamp from each incoming Message if not attached.
+   */
+  def setTimestamp(timestamp: TimeStamp): Unit = {
+    this.timestamp = timestamp
+  }
+
+  def getTimestamp: Long = timestamp
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala
new file mode 100644
index 0000000..8bffc55
--- /dev/null
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.storm.util
+
+import java.lang.{Integer => JInteger}
+import java.util.{Map => JMap}
+
+import akka.actor.ExtendedActorSystem
+import backtype.storm.serialization.SerializationFactory
+import backtype.storm.utils.ListDelegate
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.serializer.{SerializationFramework, Serializer}
+
+class StormSerializationFramework extends SerializationFramework {
+  private var stormConfig: JMap[AnyRef, AnyRef] = null
+  private var pool: ThreadLocal[Serializer] = null
+
+  override def init(system: ExtendedActorSystem, config: UserConfig): Unit = {
+    implicit val actorSystem = system
+    stormConfig = config.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get
+    pool = new ThreadLocal[Serializer]() {
+      override def initialValue(): Serializer = {
+        val kryo = SerializationFactory.getKryo(stormConfig)
+        new StormSerializer(kryo)
+      }
+    }
+  }
+
+  override def get(): Serializer = {
+    pool.get()
+  }
+}
+
+/**
+ * serializes / deserializes 
[[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]]
+ *
+ * @param kryo created by Storm 
[[backtype.storm.serialization.SerializationFactory]]
+ */
+class StormSerializer(kryo: Kryo) extends Serializer {
+  // -1 means the max buffer size is 2147483647
+  private val output = new Output(4096, -1)
+  private val input = new Input
+
+  override def serialize(message: Any): Array[Byte] = {
+    val tuple = message.asInstanceOf[GearpumpTuple]
+    output.clear()
+    output.writeInt(tuple.sourceTaskId)
+    output.writeString(tuple.sourceStreamId)
+    val listDelegate = new ListDelegate
+    listDelegate.setDelegate(tuple.values)
+    kryo.writeObject(output, listDelegate)
+    output.toBytes
+  }
+
+  override def deserialize(msg: Array[Byte]): Any = {
+    input.setBuffer(msg)
+    val sourceTaskId: JInteger = input.readInt
+    val sourceStreamId: String = input.readString
+    val listDelegate = kryo.readObject[ListDelegate](input, 
classOf[ListDelegate])
+    new GearpumpTuple(listDelegate.getDelegate, sourceTaskId, sourceStreamId, 
null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala
new file mode 100644
index 0000000..40e36a6
--- /dev/null
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util
+
+import java.lang.{Boolean => JBoolean}
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import akka.actor.ActorSystem
+import backtype.storm.Config
+import backtype.storm.generated._
+import org.apache.storm.shade.org.json.simple.JSONValue
+
+import org.apache.gearpump.cluster.UserConfig
+import 
org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt,
 GearpumpSpout}
+import org.apache.gearpump.experiments.storm.topology._
+import org.apache.gearpump.experiments.storm.util.StormConstants._
+import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
+import org.apache.gearpump.util.Util
+
+object StormUtil {
+
+  /**
+   * Convert storm task id to gearpump 
[[org.apache.gearpump.streaming.task.TaskId]]
+   *
+   * The high 16 bit of an Int is TaskId.processorId
+   * The low 16 bit of an Int is TaskId.index
+   */
+  def stormTaskIdToGearpump(id: Integer): TaskId = {
+    val index = id & 0xFFFF
+    val processorId = id >> 16
+    TaskId(processorId, index)
+  }
+
+  /**
+   * convert gearpump [[TaskId]] to storm task id
+   * TaskId.processorId is the high 16 bit of an Int
+   * TaskId.index is the low 16 bit  of an Int
+   */
+  def gearpumpTaskIdToStorm(taskId: TaskId): Integer = {
+    val index = taskId.index
+    val processorId = taskId.processorId
+    (processorId << 16) + (index & 0xFFFF)
+  }
+
+  /**
+   * @return a configured [[GearpumpStormComponent]]
+   */
+  def getGearpumpStormComponent(
+      taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem)
+    : GearpumpStormComponent = {
+    val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get
+    val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get
+    val componentId = conf.getString(STORM_COMPONENT).get
+    val spouts = topology.get_spouts
+    val bolts = topology.get_bolts
+    if (spouts.containsKey(componentId)) {
+      GearpumpSpout(topology, stormConfig, spouts.get(componentId), 
taskContext)
+    } else if (bolts.containsKey(componentId)) {
+      GearpumpBolt(topology, stormConfig, bolts.get(componentId), taskContext)
+    } else {
+      throw new Exception(s"storm component $componentId not found")
+    }
+  }
+
+  /**
+   * Parses config in json to map, returns empty map for invalid json string
+   *
+   * @param json config in json
+   * @return config in map
+   */
+  def parseJsonStringToMap(json: String): JMap[AnyRef, AnyRef] = {
+    Option(json).flatMap(json => JSONValue.parse(json) match {
+      case m: JMap[_, _] => Option(m.asInstanceOf[JMap[AnyRef, AnyRef]])
+      case _ => None
+    }).getOrElse(new JHashMap[AnyRef, AnyRef])
+  }
+
+  /**
+   * get Int value of the config name
+   */
+  def getInt(conf: JMap[_, _], name: String): Option[Int] = {
+    Option(conf.get(name)).map {
+      case number: Number => number.intValue
+      case invalid => throw new IllegalArgumentException(
+        s"$name must be Java Integer; actual: ${invalid.getClass}")
+    }
+  }
+
+  /**
+   * get Boolean value of the config name
+   */
+  def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = {
+    Option(conf.get(name)).map {
+      case b: JBoolean => b.booleanValue()
+      case invalid => throw new IllegalArgumentException(
+        s"$name must be a Java Boolean; acutal: ${invalid.getClass}")
+    }
+  }
+
+  /**
+   * clojure mod ported from Storm
+   * see https://clojuredocs.org/clojure.core/mod
+   */
+  def mod(num: Int, div: Int): Int = {
+    (num % div + div) % div
+  }
+
+  def ackEnabled(config: JMap[AnyRef, AnyRef]): Boolean = {
+    if (config.containsKey(Config.TOPOLOGY_ACKER_EXECUTORS)) {
+      getInt(config, Config.TOPOLOGY_ACKER_EXECUTORS).map(_ != 
0).getOrElse(true)
+    } else {
+      false
+    }
+  }
+
+  def getThriftPort(): Int = {
+    Util.findFreePort().getOrElse(
+      throw new Exception("unable to find free port for thrift server"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
deleted file mode 100644
index ed4a6bb..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.partitioner
-
-import java.util.{List => JList}
-import scala.collection.JavaConverters._
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.partitioner.Partitioner
-
-class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("StormPartitioner should get partitions directed by message and 
target") {
-    val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
-    val componentsGen = 
Gen.listOf[String](Gen.alphaStr).map(_.distinct).suchThat(_.size > 1)
-    val partitionsGen = 
Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted.toArray)
-    val tupleFactoryGen = for {
-      values <- 
Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
-      sourceTaskId <- idGen
-      sourceStreamId <- Gen.alphaStr
-    } yield (targetPartitions: Map[String, Array[Int]]) => {
-      new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, 
targetPartitions)
-    }
-
-    forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) {
-      (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int,
-        components: List[String], partitions: Array[Int]) => {
-        val currentPartitionId = id
-        val targetPartitions = components.init.map(c => (c, partitions)).toMap
-        val tuple = tupleFactory(targetPartitions)
-        targetPartitions.foreach {
-          case (target, ps) => {
-            val partitioner = new StormPartitioner(target)
-            ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, 
currentPartitionId)
-          }
-        }
-        val partitionNum = id
-        val nonTarget = components.last
-        val partitioner = new StormPartitioner(nonTarget)
-
-        partitioner.getPartitions(Message(tuple), partitionNum,
-          currentPartitionId) shouldBe List(Partitioner.UNKNOWN_PARTITION_ID)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
deleted file mode 100644
index 64cabd5..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.processor
-
-import java.util.{List => JList}
-import scala.collection.JavaConverters._
-
-import backtype.storm.tuple.Tuple
-import backtype.storm.utils.Utils
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.experiments.storm.util.StormOutputCollector
-
-class StormBoltOutputCollectorSpec
-  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  property("StormBoltOutputCollector should call StormOutputCollector") {
-    val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 
1000))
-    val valuesGen = Gen.listOf[AnyRef](valGen)
-
-    forAll(valuesGen) { (values: List[AnyRef]) =>
-      val collector = mock[StormOutputCollector]
-      val boltCollector = new StormBoltOutputCollector(collector)
-      val streamId = Utils.DEFAULT_STREAM_ID
-      boltCollector.emit(streamId, null, values.asJava)
-      verify(collector).emit(streamId, values.asJava)
-    }
-  }
-
-  property("StormBoltOutputCollector should throw on fail") {
-    val collector = mock[StormOutputCollector]
-    val tuple = mock[Tuple]
-    val boltCollector = new StormBoltOutputCollector(collector)
-    an[Exception] should be thrownBy boltCollector.fail(tuple)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
deleted file mode 100644
index a3a8196..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.processor
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
-
-  "StormProcessor" should {
-    "start GearpumpSpout onStart" in {
-      val startTime = mock[StartTime]
-      val gearpumpBolt = mock[GearpumpBolt]
-      when(gearpumpBolt.getTickFrequency).thenReturn(None)
-      val taskContext = MockUtil.mockTaskContext
-      val userConfig = UserConfig.empty
-      val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, 
userConfig)
-
-      stormProcessor.onStart(startTime)
-
-      verify(gearpumpBolt).start(startTime)
-    }
-
-    "pass message to GearpumpBolt onNext" in {
-      val message = mock[Message]
-      val gearpumpBolt = mock[GearpumpBolt]
-      val freq = 5
-      when(gearpumpBolt.getTickFrequency).thenReturn(Some(freq))
-      val taskContext = MockUtil.mockTaskContext
-      val userConfig = UserConfig.empty
-      val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, 
userConfig)
-
-      stormProcessor.onNext(message)
-
-      verify(gearpumpBolt).next(message)
-
-      stormProcessor.onNext(StormProcessor.TICK)
-
-      verify(gearpumpBolt).tick(freq)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
deleted file mode 100644
index 8c10afc..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.producer
-
-import akka.testkit.TestProbe
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-
-class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
-
-  "StormProducer" should {
-    "start GearpumpSpout onStart" in {
-      val startTime = mock[StartTime]
-      val gearpumpSpout = mock[GearpumpSpout]
-      when(gearpumpSpout.getMessageTimeout).thenReturn(None)
-      val taskContext = MockUtil.mockTaskContext
-      implicit val actorSystem = taskContext.system
-      val taskActor = TestProbe()
-      when(taskContext.self).thenReturn(taskActor.ref)
-      val userConfig = UserConfig.empty
-      val stormProducer = new StormProducer(gearpumpSpout, taskContext, 
userConfig)
-
-      stormProducer.onStart(startTime)
-
-      verify(gearpumpSpout).start(startTime)
-      taskActor.expectMsg(Message("start"))
-    }
-
-    "pass message to GearpumpBolt onNext" in {
-      val message = mock[Message]
-      val gearpumpSpout = mock[GearpumpSpout]
-      val timeout = 5L
-      when(gearpumpSpout.getMessageTimeout).thenReturn(Some(timeout))
-      val taskContext = MockUtil.mockTaskContext
-      implicit val actorSystem = taskContext.system
-      val taskActor = TestProbe()
-      when(taskContext.self).thenReturn(taskActor.ref)
-      val userConfig = UserConfig.empty
-      val stormProducer = new StormProducer(gearpumpSpout, taskContext, 
userConfig)
-
-      stormProducer.onNext(message)
-
-      verify(gearpumpSpout).next(message)
-      taskActor.expectMsg(Message("continue"))
-
-      stormProducer.onNext(StormProducer.TIMEOUT)
-      verify(gearpumpSpout).timeout(timeout)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
deleted file mode 100644
index e638da9..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.producer
-
-import java.util.{List => JList}
-import scala.collection.JavaConverters._
-
-import backtype.storm.spout.ISpout
-import backtype.storm.utils.Utils
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.experiments.storm.util.StormOutputCollector
-
-class StormSpoutOutputCollectorSpec
-  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  property("StormSpoutOutputCollector should call StormOutputCollector") {
-    val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 
1000))
-    val valuesGen = Gen.listOf[AnyRef](valGen)
-
-    forAll(valuesGen) { (values: List[AnyRef]) =>
-      val collector = mock[StormOutputCollector]
-      val spout = mock[ISpout]
-      val streamId = Utils.DEFAULT_STREAM_ID
-      val spoutCollector = new StormSpoutOutputCollector(collector, spout, 
false)
-      spoutCollector.emit(streamId, values.asJava, null)
-      verify(collector).emit(streamId, values.asJava)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
deleted file mode 100644
index 319ee27..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.storm.topology
-
-import java.util.{Map => JMap}
-
-import akka.actor.ActorRef
-import backtype.storm.spout.{ISpout, SpoutOutputCollector}
-import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, 
TopologyContext}
-import backtype.storm.tuple.Tuple
-import org.mockito.Matchers.{anyObject, eq => mockitoEq}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, 
GearpumpSpout}
-import io.gearpump.experiments.storm.util.StormOutputCollector
-import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
-import io.gearpump.streaming.{DAG, MockUtil}
-import io.gearpump.{Message, TimeStamp}
-
-class GearpumpStormComponentSpec
-  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  property("GearpumpSpout lifecycle") {
-    val config = mock[JMap[AnyRef, AnyRef]]
-    val spout = mock[ISpout]
-    val taskContext = MockUtil.mockTaskContext
-    val appMaster = mock[ActorRef]
-    when(taskContext.appMaster).thenReturn(appMaster)
-    val getDAG = mock[ActorRef => DAG]
-    val dag = mock[DAG]
-    when(getDAG(appMaster)).thenReturn(dag)
-    val getTopologyContext = mock[(DAG, TaskId) => TopologyContext]
-    val topologyContext = mock[TopologyContext]
-    when(getTopologyContext(dag, 
taskContext.taskId)).thenReturn(topologyContext)
-    val getOutputCollector = mock[(TaskContext, TopologyContext) => 
StormSpoutOutputCollector]
-    val outputCollector = mock[StormSpoutOutputCollector]
-    when(getOutputCollector(taskContext, 
topologyContext)).thenReturn(outputCollector)
-
-    val gearpumpSpout = GearpumpSpout(config, spout, getDAG, 
getTopologyContext,
-      getOutputCollector, ackEnabled = false, taskContext)
-
-    // Start
-    val startTime = mock[StartTime]
-    gearpumpSpout.start(startTime)
-
-    verify(spout).open(mockitoEq(config), mockitoEq(topologyContext),
-      anyObject[SpoutOutputCollector])
-
-    // Next
-    val message = mock[Message]
-    gearpumpSpout.next(message)
-
-    verify(spout).nextTuple()
-  }
-
-  property("GearpumpBolt lifecycle") {
-    val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-    val freqGen = Gen.chooseNum[Int](1, 100)
-    forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) =>
-      val config = mock[JMap[AnyRef, AnyRef]]
-      val bolt = mock[IBolt]
-      val taskContext = MockUtil.mockTaskContext
-      val appMaster = mock[ActorRef]
-      when(taskContext.appMaster).thenReturn(appMaster)
-      val getDAG = mock[ActorRef => DAG]
-      val dag = mock[DAG]
-      when(getDAG(appMaster)).thenReturn(dag)
-      val getTopologyContext = mock[(DAG, TaskId) => TopologyContext]
-      val topologyContext = mock[TopologyContext]
-      when(getTopologyContext(dag, 
taskContext.taskId)).thenReturn(topologyContext)
-      val getGeneralTopologyContext = mock[DAG => GeneralTopologyContext]
-      val generalTopologyContext = mock[GeneralTopologyContext]
-      when(getGeneralTopologyContext(dag)).thenReturn(generalTopologyContext)
-      val getOutputCollector = mock[(TaskContext, TopologyContext) => 
StormOutputCollector]
-      val stormOutputCollector = mock[StormOutputCollector]
-      when(getOutputCollector(taskContext, 
topologyContext)).thenReturn(stormOutputCollector)
-      val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple]
-      val tickTuple = mock[Tuple]
-      when(getTickTuple(mockitoEq(generalTopologyContext), 
anyObject[Int]())).thenReturn(tickTuple)
-      val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext,
-        getGeneralTopologyContext, getOutputCollector, getTickTuple, 
taskContext)
-
-      // Start
-      val startTime = mock[StartTime]
-      gearpumpBolt.start(startTime)
-
-      verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext),
-        anyObject[OutputCollector])
-
-      // Next
-      val gearpumpTuple = mock[GearpumpTuple]
-      val tuple = mock[Tuple]
-      when(gearpumpTuple.toTuple(generalTopologyContext, 
timestamp)).thenReturn(tuple)
-      val message = Message(gearpumpTuple, timestamp)
-      gearpumpBolt.next(message)
-
-      verify(stormOutputCollector).setTimestamp(timestamp)
-      verify(bolt).execute(tuple)
-
-      // Tick
-      gearpumpBolt.tick(freq)
-      verify(bolt).execute(tickTuple)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
deleted file mode 100644
index f9ffc5f..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.topology
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-import scala.collection.JavaConverters._
-
-import backtype.storm.Config
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.experiments.storm.processor.StormProcessor
-import io.gearpump.experiments.storm.producer.StormProducer
-import io.gearpump.experiments.storm.util.TopologyUtil
-import io.gearpump.streaming.MockUtil
-
-class GearpumpStormTopologySpec extends WordSpec with Matchers with 
MockitoSugar {
-  import io.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._
-
-  "GearpumpStormTopology" should {
-    "merge configs with defined priority" in {
-      val stormTopology = TopologyUtil.getTestTopology
-      val name = "name"
-      val sysVal = "sys"
-      val sysConfig = newJavaConfig(name, sysVal)
-      val appVal = "app"
-      val appConfig = newJavaConfig(name, appVal)
-
-      implicit val system = MockUtil.system
-      val topology1 = new GearpumpStormTopology("topology1", stormTopology, 
newEmptyConfig,
-        newEmptyConfig)
-      topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1"
-      topology1.getStormConfig should not contain name
-
-      val topology2 = new GearpumpStormTopology("topology2", stormTopology, 
sysConfig,
-        newEmptyConfig)
-      topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2"
-      topology2.getStormConfig.get(name) shouldBe sysVal
-
-      val topology3 = new GearpumpStormTopology("topology3", stormTopology, 
sysConfig, appConfig)
-      topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3"
-      topology3.getStormConfig.get(name) shouldBe appVal
-    }
-
-    "create Gearpump processors from Storm topology" in {
-      val stormTopology = TopologyUtil.getTestTopology
-      implicit val system = MockUtil.system
-      val gearpumpStormTopology =
-        GearpumpStormTopology("app", stormTopology, null)
-      val processors = gearpumpStormTopology.getProcessors
-      stormTopology.get_spouts().asScala.foreach { case (spoutId, _) =>
-        val processor = processors(spoutId)
-        processor.taskClass shouldBe classOf[StormProducer]
-        processor.description shouldBe spoutId
-      }
-      stormTopology.get_bolts().asScala.foreach { case (boltId, _) =>
-        val processor = processors(boltId)
-        processor.taskClass shouldBe classOf[StormProcessor]
-        processor.description shouldBe boltId
-      }
-    }
-
-    "get target processors from source id" in {
-      val stormTopology = TopologyUtil.getTestTopology
-      implicit val system = MockUtil.system
-      val sysConfig = new JHashMap[AnyRef, AnyRef]
-      val gearpumpStormTopology =
-        GearpumpStormTopology("app", stormTopology, null)
-      val targets0 = gearpumpStormTopology.getTargets("1")
-      targets0 should contain key "3"
-      targets0 should contain key "4"
-      val targets1 = gearpumpStormTopology.getTargets("2")
-      targets1 should contain key "3"
-    }
-  }
-}
-
-object GearpumpStormTopologySpec {
-  def newEmptyConfig: JMap[AnyRef, AnyRef] = {
-    new JHashMap[AnyRef, AnyRef]
-  }
-
-  def newJavaConfig(key: AnyRef, value: AnyRef): JMap[AnyRef, AnyRef] = {
-    val config = new JHashMap[AnyRef, AnyRef]
-    config.put(key, value)
-    config
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
deleted file mode 100644
index f454145..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.storm.topology
-
-import java.util.{List => JList}
-import scala.collection.JavaConverters._
-
-import backtype.storm.task.GeneralTopologyContext
-import backtype.storm.tuple.Fields
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.TimeStamp
-
-class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
-
-  property("GearpumpTuple should create Storm Tuple") {
-    val tupleGen = for {
-      values <- 
Gen.listOf[String](Gen.alphaStr).map(_.distinct.asJava.asInstanceOf[JList[AnyRef]])
-      sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue)
-      sourceStreamId <- Gen.alphaStr
-    } yield new GearpumpTuple(values, new Integer(sourceTaskId), 
sourceStreamId, null)
-
-    forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
-      (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: 
TimeStamp) =>
-        val topologyContext = mock[GeneralTopologyContext]
-        val fields = new 
Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
-        
when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)
-        when(topologyContext.getComponentOutputFields(
-          componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields)
-
-        val tuple = gearpumpTuple.toTuple(topologyContext, timestamp)
-
-        tuple shouldBe a[TimedTuple]
-        val timedTuple = tuple.asInstanceOf[TimedTuple]
-        timedTuple.getValues shouldBe gearpumpTuple.values
-        timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId
-        timedTuple.getSourceComponent shouldBe componentId
-        timedTuple.getSourceStreamId shouldBe gearpumpTuple.sourceStreamId
-        timedTuple.getMessageId shouldBe null
-        timedTuple.getFields shouldBe fields
-        timedTuple.timestamp shouldBe timestamp
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
deleted file mode 100644
index 27c02fe..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.experiments.storm.partitioner.StormPartitioner
-import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.task.Task
-
-class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar {
-
-  "GraphBuilder" should {
-    "build Graph from Storm topology" in {
-      val topology = mock[GearpumpStormTopology]
-
-      val sourceId = "source"
-      val sourceProcessor = mock[Processor[Task]]
-      val targetId = "target"
-      val targetProcessor = mock[Processor[Task]]
-
-      when(topology.getProcessors).thenReturn(
-        Map(sourceId -> sourceProcessor, targetId -> targetProcessor))
-      when(topology.getTargets(sourceId)).thenReturn(Map(targetId -> 
targetProcessor))
-      when(topology.getTargets(targetId)).thenReturn(Map.empty[String, 
Processor[Task]])
-
-      val graph = GraphBuilder.build(topology)
-
-      graph.edges.size shouldBe 1
-      val (from, edge, to) = graph.edges.head
-      from shouldBe sourceProcessor
-      edge shouldBe a[StormPartitioner]
-      to shouldBe targetProcessor
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
deleted file mode 100644
index 599ea8d..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.util.{List => JList}
-import scala.collection.JavaConverters._
-
-import backtype.storm.generated.GlobalStreamId
-import backtype.storm.grouping.CustomStreamGrouping
-import backtype.storm.task.TopologyContext
-import backtype.storm.tuple.Fields
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.experiments.storm.util.GrouperSpec.Value
-
-class GrouperSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
-
-  val taskIdGen = Gen.chooseNum[Int](0, 1000)
-  val valuesGen = Gen.listOf[String](Gen.alphaStr)
-    .map(_.asJava.asInstanceOf[JList[AnyRef]])
-  val numTasksGen = Gen.chooseNum[Int](1, 1000)
-
-  property("GlobalGrouper should always return partition 0") {
-    forAll(taskIdGen, valuesGen) { (taskId: Int, values: JList[AnyRef]) =>
-      val grouper = new GlobalGrouper
-      grouper.getPartitions(taskId, values) shouldBe List(0)
-    }
-  }
-
-  property("NoneGrouper should returns partition in the range [0, numTasks)") {
-    forAll(taskIdGen, valuesGen, numTasksGen) {
-      (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
-        val grouper = new NoneGrouper(numTasks)
-        val partitions = grouper.getPartitions(taskId, values)
-        partitions.size shouldBe 1
-        partitions.head should (be >= 0 and be < numTasks)
-    }
-  }
-
-  property("ShuffleGrouper should return partition in the range [0, 
numTasks)") {
-    forAll(taskIdGen, valuesGen, numTasksGen) {
-      (taskId: Int, values: JList[AnyRef], numTasks: Int) =>
-        val grouper = new ShuffleGrouper(numTasks)
-        val partitions = grouper.getPartitions(taskId, values)
-        partitions.size shouldBe 1
-        partitions.head should (be >= 0 and be < numTasks)
-    }
-  }
-
-  property("FieldsGrouper should return partition according to fields") {
-    forAll(taskIdGen, numTasksGen) {
-      (taskId: Int, numTasks: Int) =>
-        val values = 0.until(numTasks).map(i => new Value(i))
-        val fields = values.map(_.toString)
-        val outFields = new Fields(fields: _*)
-        values.flatMap { v =>
-          val groupFields = new Fields(v.toString)
-          val grouper = new FieldsGrouper(outFields, groupFields, numTasks)
-          grouper.getPartitions(taskId,
-            values.toList.asJava.asInstanceOf[JList[AnyRef]])
-        }.distinct.size shouldBe numTasks
-    }
-  }
-
-  property("AllGrouper should return all partitions") {
-    forAll(taskIdGen, numTasksGen, valuesGen) {
-      (taskId: Int, numTasks: Int, values: JList[AnyRef]) =>
-        val grouper = new AllGrouper(numTasks)
-        val partitions = grouper.getPartitions(taskId, values)
-        partitions.distinct.size shouldBe numTasks
-        partitions.min shouldBe 0
-        partitions.max shouldBe (numTasks - 1)
-    }
-  }
-
-  property("CustomGrouper should return partitions specified by user") {
-    val grouping = mock[CustomStreamGrouping]
-    val grouper = new CustomGrouper(grouping)
-    val topologyContext = mock[TopologyContext]
-    val globalStreamId = mock[GlobalStreamId]
-    val sourceTasks = mock[JList[Integer]]
-
-    grouper.prepare(topologyContext, globalStreamId, sourceTasks)
-
-    verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks)
-
-    forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: 
JList[AnyRef], taskNum: Int) =>
-      0.until(taskNum).foreach { i =>
-        when(grouping.chooseTasks(taskId, values)).thenReturn(List(new 
Integer(i)).asJava)
-        grouper.getPartitions(taskId, values) shouldBe List(i)
-      }
-    }
-  }
-}
-
-object GrouperSpec {
-  class Value(val i: Int) extends AnyRef {
-
-    override def toString: String = s"$i"
-
-    override def hashCode(): Int = i
-
-    override def equals(other: Any): Boolean = {
-      if (other.isInstanceOf[Value]) {
-        this.i == other.asInstanceOf[Value].i
-      } else {
-        false
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
deleted file mode 100644
index 1a28694..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.storm.util
-
-import java.util.{List => JList, Map => JMap}
-import scala.collection.JavaConverters._
-
-import backtype.storm.generated.Grouping
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump._
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.streaming.MockUtil
-
-class StormOutputCollectorSpec
-  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
-
-  val stormTaskId = 0
-  val streamIdGen = Gen.alphaStr
-  val valuesGen = 
Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
-  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-
-  property("StormOutputCollector emits tuple values into a stream") {
-    forAll(timestampGen, streamIdGen, valuesGen) {
-      (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
-        val targets = mock[JMap[String, JMap[String, Grouping]]]
-        val taskToComponent = mock[JMap[Integer, String]]
-        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
-          (Map[String, Array[Int]], JList[Integer])]
-        val targetPartitions = mock[Map[String, Array[Int]]]
-        val targetStormTaskIds = mock[JList[Integer]]
-        when(getTargetPartitionsFn(streamId, 
values)).thenReturn((targetPartitions,
-          targetStormTaskIds))
-        val taskContext = MockUtil.mockTaskContext
-        val stormOutputCollector = new StormOutputCollector(stormTaskId, 
taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, LatestTime)
-
-        when(targets.containsKey(streamId)).thenReturn(false)
-        stormOutputCollector.emit(streamId, values) shouldBe 
StormOutputCollector.EMPTY_LIST
-        verify(taskContext, times(0)).output(anyObject[Message])
-
-        when(targets.containsKey(streamId)).thenReturn(true)
-        stormOutputCollector.setTimestamp(timestamp)
-        stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
-        verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
-          case Message(tuple: GearpumpTuple, t) =>
-            val expected = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
-            tuple == expected && t == timestamp
-        }))
-    }
-  }
-
-  property("StormOutputCollector emit direct to a task") {
-    val idGen = Gen.chooseNum[Int](0, 1000)
-    val targetGen = Gen.alphaStr
-    forAll(idGen, targetGen, timestampGen, streamIdGen, valuesGen) {
-      (id: Int, target: String, timestamp: Long, streamId: String, values: 
JList[AnyRef]) =>
-        val targets = mock[JMap[String, JMap[String, Grouping]]]
-        val taskToComponent = mock[JMap[Integer, String]]
-        when(taskToComponent.get(id)).thenReturn(target)
-        val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
-          (Map[String, Array[Int]], JList[Integer])]
-        val targetPartitions = mock[Map[String, Array[Int]]]
-        val targetStormTaskIds = mock[JList[Integer]]
-        when(getTargetPartitionsFn(streamId, 
values)).thenReturn((targetPartitions,
-          targetStormTaskIds))
-        val taskContext = MockUtil.mockTaskContext
-        val stormOutputCollector = new StormOutputCollector(stormTaskId, 
taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, LatestTime)
-
-        when(targets.containsKey(streamId)).thenReturn(false)
-        verify(taskContext, times(0)).output(anyObject[Message])
-
-        when(targets.containsKey(streamId)).thenReturn(true)
-        stormOutputCollector.setTimestamp(timestamp)
-        stormOutputCollector.emitDirect(id, streamId, values)
-        val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
-        verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
-          case Message(tuple: GearpumpTuple, t) => {
-            val expected = new GearpumpTuple(values, stormTaskId, streamId,
-            Map(target -> partitions))
-
-            val result = tuple == expected && t == timestamp
-            result
-          }
-        }))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
deleted file mode 100644
index d8a29e8..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
-import scala.collection.JavaConverters._
-
-import akka.actor.ExtendedActorSystem
-import backtype.storm.utils.Utils
-import com.esotericsoftware.kryo.Kryo
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.storm.topology.GearpumpTuple
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.streaming.MockUtil
-
-class StormSerializerPoolSpec extends PropSpec with PropertyChecks with 
Matchers with MockitoSugar {
-
-  property("StormSerializerPool should create and manage StormSerializer") {
-    val taskContext = MockUtil.mockTaskContext
-    val serializerPool = new StormSerializationFramework
-    val system = taskContext.system.asInstanceOf[ExtendedActorSystem]
-    implicit val actorSystem = system
-    val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, 
AnyRef]]
-    val config = UserConfig.empty.withValue[JMap[AnyRef, 
AnyRef]](STORM_CONFIG, stormConfig)
-    serializerPool.init(system, config)
-    serializerPool.get shouldBe a[StormSerializer]
-  }
-
-  property("StormSerializer should serialize and deserialize GearpumpTuple") {
-    val tupleGen = for {
-      values <- 
Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
-      sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue)
-      sourceStreamId <- Gen.alphaStr
-    } yield new GearpumpTuple(values, new Integer(sourceTaskId), 
sourceStreamId, null)
-
-    val kryo = new Kryo
-    forAll(tupleGen) { (tuple: GearpumpTuple) =>
-      val serializer = new StormSerializer(kryo)
-      serializer.deserialize(serializer.serialize(tuple)) shouldBe tuple
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
deleted file mode 100644
index a9b93c9..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import java.lang.{Boolean => JBoolean, Long => JLong}
-import java.util.{HashMap => JHashMap, Map => JMap}
-import scala.collection.JavaConverters._
-
-import backtype.storm.Config
-import backtype.storm.generated.StormTopology
-import org.apache.storm.shade.org.json.simple.JSONValue
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.cluster.UserConfig
-import 
io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, 
GearpumpSpout}
-import io.gearpump.experiments.storm.util.StormConstants._
-import io.gearpump.experiments.storm.util.StormUtil._
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.TaskId
-
-class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
-
-  property("convert Storm task ids to gearpump TaskIds and back") {
-    val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
-    forAll(idGen) { (stormTaskId: Int) =>
-      gearpumpTaskIdToStorm(stormTaskIdToGearpump(stormTaskId)) shouldBe 
stormTaskId
-    }
-
-    val processorIdGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16)
-    val indexGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16)
-    forAll(processorIdGen, indexGen) { (processorId: Int, index: Int) =>
-      val taskId = TaskId(processorId, index)
-      stormTaskIdToGearpump(gearpumpTaskIdToStorm(taskId)) shouldBe taskId
-    }
-  }
-
-  property("get GearpumpStormComponent from user config") {
-    val taskContext = MockUtil.mockTaskContext
-    val topology = TopologyUtil.getTestTopology
-    implicit val actorSystem = taskContext.system
-    val userConfig = UserConfig.empty
-      .withValue[StormTopology](STORM_TOPOLOGY, topology)
-      .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, 
AnyRef])
-    topology.get_spouts.asScala.foreach { case (spoutId, _) =>
-      val config = userConfig.withString(STORM_COMPONENT, spoutId)
-      val component = getGearpumpStormComponent(taskContext, 
config)(taskContext.system)
-      component shouldBe a[GearpumpSpout]
-    }
-    topology.get_bolts.asScala.foreach { case (boltId, _) =>
-      val config = userConfig.withString(STORM_COMPONENT, boltId)
-      val component = getGearpumpStormComponent(taskContext, 
config)(taskContext.system)
-      component shouldBe a[GearpumpBolt]
-    }
-  }
-
-  property("parse json to map") {
-    val mapGen = Gen.listOf[String](Gen.alphaStr)
-      .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]])
-
-    forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) =>
-      parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map
-    }
-
-    val invalidJsonGen: Gen[String] = Gen.oneOf(null, "", "1")
-    forAll(invalidJsonGen) { (invalidJson: String) =>
-      val map = parseJsonStringToMap(invalidJson)
-      map shouldBe empty
-      map shouldBe a[JMap[_, _]]
-    }
-  }
-
-  property("get int from config") {
-    val name = "int"
-    val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    getInt(conf, name) shouldBe None
-    conf.put(name, null)
-    getInt(conf, name) shouldBe None
-
-    forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { (int: Int) =>
-      conf.put(name, new Integer(int))
-      getInt(conf, name) shouldBe Some(int)
-    }
-
-    forAll(Gen.chooseNum[Long](Int.MinValue, Int.MaxValue)) { (long: Long) =>
-      conf.put(name, new JLong(long))
-      getInt(conf, name) shouldBe Some(long)
-    }
-
-    forAll(Gen.alphaStr) { (s: String) =>
-      conf.put(name, s)
-      an[IllegalArgumentException] should be thrownBy getInt(conf, name)
-    }
-  }
-
-  property("get boolean from config") {
-    val name = "boolean"
-    val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    getBoolean(conf, name) shouldBe None
-    conf.put(name, null)
-    getBoolean(conf, name) shouldBe None
-
-    forAll(Gen.oneOf(true, false)) { (boolean: Boolean) =>
-      conf.put(name, new JBoolean(boolean))
-      getBoolean(conf, name) shouldBe Some(boolean)
-    }
-
-    forAll(Gen.alphaStr) { (s: String) =>
-      conf.put(name, s)
-      an[IllegalArgumentException] should be thrownBy getBoolean(conf, name)
-    }
-  }
-
-  property("mod should be correct") {
-    mod(10, 5) shouldBe 0
-    mod(10, 6) shouldBe 4
-    mod(10, -3) shouldBe -2
-    mod(-2, 5) shouldBe 3
-    mod(-1, -2) shouldBe -1
-  }
-
-  property("get whether ack enabled") {
-    val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
-    ackEnabled(conf) shouldBe false
-    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(0))
-    ackEnabled(conf) shouldBe false
-    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, null)
-    ackEnabled(conf) shouldBe true
-    forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) {
-      (ackers: Int) =>
-        conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(ackers))
-        if (ackers == 0) {
-          ackEnabled(conf) shouldBe false
-        } else {
-          ackEnabled(conf) shouldBe true
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
deleted file mode 100644
index 8491786..0000000
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util
-
-import backtype.storm.generated.StormTopology
-import backtype.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout}
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.tuple.Fields
-import backtype.storm.utils.Utils
-
-object TopologyUtil {
-  val DEFAULT_STREAM_ID = Utils.DEFAULT_STREAM_ID
-  val DEFAULT_COMPONENT_ID = "component"
-
-  def getTestTopology: StormTopology = {
-    val topologyBuilder = new TopologyBuilder
-    topologyBuilder.setSpout("1", new TestWordSpout(true), 5)
-    topologyBuilder.setSpout("2", new TestWordSpout(true), 3)
-    topologyBuilder.setBolt("3", new TestWordCounter(), 3)
-      .fieldsGrouping("1", new Fields("word"))
-      .fieldsGrouping("2", new Fields("word"))
-    topologyBuilder.setBolt("4", new TestGlobalCount()).globalGrouping("1")
-    topologyBuilder.createTopology()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
new file mode 100644
index 0000000..5513423
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.partitioner
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters._
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
+import org.apache.gearpump.partitioner.Partitioner
+
+class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers {
+
+  property("StormPartitioner should get partitions directed by message and 
target") {
+    val idGen = Gen.chooseNum[Int](0, Int.MaxValue)
+    val componentsGen = 
Gen.listOf[String](Gen.alphaStr).map(_.distinct).suchThat(_.size > 1)
+    val partitionsGen = 
Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted.toArray)
+    val tupleFactoryGen = for {
+      values <- 
Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
+      sourceTaskId <- idGen
+      sourceStreamId <- Gen.alphaStr
+    } yield (targetPartitions: Map[String, Array[Int]]) => {
+      new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, 
targetPartitions)
+    }
+
+    forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) {
+      (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int,
+        components: List[String], partitions: Array[Int]) => {
+        val currentPartitionId = id
+        val targetPartitions = components.init.map(c => (c, partitions)).toMap
+        val tuple = tupleFactory(targetPartitions)
+        targetPartitions.foreach {
+          case (target, ps) => {
+            val partitioner = new StormPartitioner(target)
+            ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, 
currentPartitionId)
+          }
+        }
+        val partitionNum = id
+        val nonTarget = components.last
+        val partitioner = new StormPartitioner(nonTarget)
+
+        partitioner.getPartitions(Message(tuple), partitionNum,
+          currentPartitionId) shouldBe List(Partitioner.UNKNOWN_PARTITION_ID)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
new file mode 100644
index 0000000..430b1c0
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.processor
+
+import java.util.{List => JList}
+
+import backtype.storm.tuple.Tuple
+import backtype.storm.utils.Utils
+import org.apache.gearpump.experiments.storm.util.StormOutputCollector
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import scala.collection.JavaConverters._
+
+class StormBoltOutputCollectorSpec
+  extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+  property("StormBoltOutputCollector should call StormOutputCollector") {
+    val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 
1000))
+    val valuesGen = Gen.listOf[AnyRef](valGen)
+
+    forAll(valuesGen) { (values: List[AnyRef]) =>
+      val collector = mock[StormOutputCollector]
+      val boltCollector = new StormBoltOutputCollector(collector)
+      val streamId = Utils.DEFAULT_STREAM_ID
+      boltCollector.emit(streamId, null, values.asJava)
+      verify(collector).emit(streamId, values.asJava)
+    }
+  }
+
+  property("StormBoltOutputCollector should throw on fail") {
+    val collector = mock[StormOutputCollector]
+    val tuple = mock[Tuple]
+    val boltCollector = new StormBoltOutputCollector(collector)
+    an[Exception] should be thrownBy boltCollector.fail(tuple)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
new file mode 100644
index 0000000..2111df6
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.processor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import 
org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar {
+
+  "StormProcessor" should {
+    "start GearpumpSpout onStart" in {
+      val startTime = mock[StartTime]
+      val gearpumpBolt = mock[GearpumpBolt]
+      when(gearpumpBolt.getTickFrequency).thenReturn(None)
+      val taskContext = MockUtil.mockTaskContext
+      val userConfig = UserConfig.empty
+      val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, 
userConfig)
+
+      stormProcessor.onStart(startTime)
+
+      verify(gearpumpBolt).start(startTime)
+    }
+
+    "pass message to GearpumpBolt onNext" in {
+      val message = mock[Message]
+      val gearpumpBolt = mock[GearpumpBolt]
+      val freq = 5
+      when(gearpumpBolt.getTickFrequency).thenReturn(Some(freq))
+      val taskContext = MockUtil.mockTaskContext
+      val userConfig = UserConfig.empty
+      val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, 
userConfig)
+
+      stormProcessor.onNext(message)
+
+      verify(gearpumpBolt).next(message)
+
+      stormProcessor.onNext(StormProcessor.TICK)
+
+      verify(gearpumpBolt).tick(freq)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
new file mode 100644
index 0000000..39a008f
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.producer
+
+import akka.testkit.TestProbe
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import 
org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.task.StartTime
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
+
+  "StormProducer" should {
+    "start GearpumpSpout onStart" in {
+      val startTime = mock[StartTime]
+      val gearpumpSpout = mock[GearpumpSpout]
+      when(gearpumpSpout.getMessageTimeout).thenReturn(None)
+      val taskContext = MockUtil.mockTaskContext
+      implicit val actorSystem = taskContext.system
+      val taskActor = TestProbe()
+      when(taskContext.self).thenReturn(taskActor.ref)
+      val userConfig = UserConfig.empty
+      val stormProducer = new StormProducer(gearpumpSpout, taskContext, 
userConfig)
+
+      stormProducer.onStart(startTime)
+
+      verify(gearpumpSpout).start(startTime)
+      taskActor.expectMsg(Message("start"))
+    }
+
+    "pass message to GearpumpBolt onNext" in {
+      val message = mock[Message]
+      val gearpumpSpout = mock[GearpumpSpout]
+      val timeout = 5L
+      when(gearpumpSpout.getMessageTimeout).thenReturn(Some(timeout))
+      val taskContext = MockUtil.mockTaskContext
+      implicit val actorSystem = taskContext.system
+      val taskActor = TestProbe()
+      when(taskContext.self).thenReturn(taskActor.ref)
+      val userConfig = UserConfig.empty
+      val stormProducer = new StormProducer(gearpumpSpout, taskContext, 
userConfig)
+
+      stormProducer.onNext(message)
+
+      verify(gearpumpSpout).next(message)
+      taskActor.expectMsg(Message("continue"))
+
+      stormProducer.onNext(StormProducer.TIMEOUT)
+      verify(gearpumpSpout).timeout(timeout)
+    }
+  }
+}


Reply via email to