http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala new file mode 100644 index 0000000..f95b840 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap} +import scala.collection.JavaConverters._ + +import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject} +import backtype.storm.grouping.CustomStreamGrouping +import backtype.storm.task.TopologyContext +import backtype.storm.tuple.Fields +import backtype.storm.utils.Utils +import org.slf4j.Logger + +import org.apache.gearpump._ +import org.apache.gearpump.experiments.storm.topology.GearpumpTuple +import org.apache.gearpump.experiments.storm.util.StormUtil._ +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.task.{TaskContext, TaskId} +import org.apache.gearpump.util.LogUtil + +object StormOutputCollector { + private val LOG: Logger = LogUtil.getLogger(classOf[StormOutputCollector]) + private[storm] val EMPTY_LIST: JList[Integer] = new JArrayList[Integer](0) + + def apply(taskContext: TaskContext, topologyContext: TopologyContext): StormOutputCollector = { + val stormTaskId = topologyContext.getThisTaskId + val componentId = topologyContext.getThisComponentId + val taskToComponent = topologyContext.getTaskToComponent + val componentToProcessorId = getComponentToProcessorId(taskToComponent.asScala.toMap) + val targets = topologyContext.getTargets(componentId) + val streamGroupers: Map[String, Grouper] = + targets.asScala.flatMap { case (streamId, targetGrouping) => + targetGrouping.asScala.collect { case (target, grouping) if !grouping.is_set_direct() => + streamId -> getGrouper(topologyContext, grouping, componentId, streamId, target) + } + }.toMap + val getTargetPartitionsFn = (streamId: String, values: JList[AnyRef]) => { + getTargetPartitions(stormTaskId, streamId, targets, + streamGroupers, componentToProcessorId, values) + } + new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, + taskContext, LatestTime) + } + + /** + * get target Gearpump partitions and Storm task ids + */ + private def getTargetPartitions( + stormTaskId: Int, + streamId: String, + targets: JMap[String, JMap[String, Grouping]], + streamGroupers: Map[String, Grouper], + componentToProcessorId: Map[String, ProcessorId], + values: JList[AnyRef]): (Map[String, Array[Int]], JList[Integer]) = { + val ret: JList[Integer] = new JArrayList[Integer](targets.size) + + @annotation.tailrec + def getRecur(iter: JIterator[String], + accum: Map[String, Array[Int]]): Map[String, Array[Int]] = { + if (iter.hasNext) { + val target = iter.next + val grouper = streamGroupers(streamId) + val partitions = grouper.getPartitions(stormTaskId, values) + partitions.foreach { p => + val stormTaskId = gearpumpTaskIdToStorm(TaskId(componentToProcessorId(target), p)) + ret.add(stormTaskId) + } + getRecur(iter, accum + (target -> partitions)) + } else { + accum + } + } + val targetPartitions = getRecur(targets.get(streamId).keySet().iterator, + Map.empty[String, Array[Int]]) + (targetPartitions, ret) + } + + private def getComponentToProcessorId(taskToComponent: Map[Integer, String]) + : Map[String, ProcessorId] = { + taskToComponent.map { case (id, component) => + component -> stormTaskIdToGearpump(id).processorId + } + } + + private def getGrouper(topologyContext: TopologyContext, grouping: Grouping, + source: String, streamId: String, target: String): Grouper = { + val outFields = topologyContext.getComponentOutputFields(source, streamId) + val targetTasks = topologyContext.getComponentTasks(target) + val targetTaskNum = targetTasks.size + val globalStreamId = new GlobalStreamId(source, streamId) + + grouping.getSetField match { + case Grouping._Fields.FIELDS => + if (isGlobalGrouping(grouping)) { + new GlobalGrouper + } else { + new FieldsGrouper(outFields, new Fields(grouping.get_fields()), targetTaskNum) + } + case Grouping._Fields.SHUFFLE => + new ShuffleGrouper(targetTaskNum) + case Grouping._Fields.NONE => + new NoneGrouper(targetTaskNum) + case Grouping._Fields.ALL => + new AllGrouper(targetTaskNum) + case Grouping._Fields.CUSTOM_SERIALIZED => + val customGrouping = Utils.javaDeserialize(grouping.get_custom_serialized, + classOf[Serializable]).asInstanceOf[CustomStreamGrouping] + val grouper = new CustomGrouper(customGrouping) + grouper.prepare(topologyContext, globalStreamId, targetTasks) + grouper + case Grouping._Fields.CUSTOM_OBJECT => + val customObject = grouping.get_custom_object() + val customGrouping = instantiateJavaObject(customObject) + val grouper = new CustomGrouper(customGrouping) + grouper.prepare(topologyContext, globalStreamId, targetTasks) + grouper + case Grouping._Fields.LOCAL_OR_SHUFFLE => + // Gearpump has built-in support for sending messages to local actor + new ShuffleGrouper(targetTaskNum) + case Grouping._Fields.DIRECT => + throw new Exception("direct grouping should not be called here") + } + } + + private def isGlobalGrouping(grouping: Grouping): Boolean = { + grouping.getSetField == Grouping._Fields.FIELDS && + grouping.get_fields.isEmpty + } + + private def instantiateJavaObject(javaObject: JavaObject): CustomStreamGrouping = { + val className = javaObject.get_full_class_name() + val args = javaObject.get_args_list().asScala.map(_.getFieldValue) + val customGrouping = Class.forName(className).getConstructor(args.map(_.getClass): _*) + .newInstance(args).asInstanceOf[CustomStreamGrouping] + customGrouping + } +} + +/** + * Provides common functionality for + * [[org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector]] + * and [[org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector]] + */ +class StormOutputCollector( + stormTaskId: Int, + taskToComponent: JMap[Integer, String], + targets: JMap[String, JMap[String, Grouping]], + getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]), + val taskContext: TaskContext, + private var timestamp: TimeStamp) { + import org.apache.gearpump.experiments.storm.util.StormOutputCollector._ + + /** + * Emits tuple values into a stream (invoked by a Storm output collector). + * + * wrapS the values into a message of [[GearpumpTuple]] along with the target partitions + * to tell [[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] where to send + * the message. We also return the corresponding target Storm task ids back to the collector + * + * @param streamId Storm stream id + * @param values Storm tuple values + * @return Target Storm task ids + */ + def emit(streamId: String, values: JList[AnyRef]): JList[Integer] = { + if (targets.containsKey(streamId)) { + val (targetPartitions, targetStormTaskIds) = getTargetPartitionsFn(streamId, values) + val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) + taskContext.output(Message(tuple, timestamp)) + targetStormTaskIds + } else { + EMPTY_LIST + } + } + + /** + * Emit tuple values to a specific Storm task (invoked by Storm output collector). + * + * We translate the Storm task id into Gearpump TaskId and tell + * [[org.apache.gearpump.experiments.storm.partitioner.StormPartitioner]] through the + * targetPartitions field of [[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]] + * + * @param id Storm task id + * @param streamId Storm stream id + * @param values Storm tuple values + */ + def emitDirect(id: Int, streamId: String, values: JList[AnyRef]): Unit = { + if (targets.containsKey(streamId)) { + val target = taskToComponent.get(id) + val partition = stormTaskIdToGearpump(id).index + val targetPartitions = Map(target -> Array(partition)) + val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) + taskContext.output(Message(tuple, timestamp)) + } + } + + /** + * set timestamp from each incoming Message if not attached. + */ + def setTimestamp(timestamp: TimeStamp): Unit = { + this.timestamp = timestamp + } + + def getTimestamp: Long = timestamp +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala new file mode 100644 index 0000000..8bffc55 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.storm.util + +import java.lang.{Integer => JInteger} +import java.util.{Map => JMap} + +import akka.actor.ExtendedActorSystem +import backtype.storm.serialization.SerializationFactory +import backtype.storm.utils.ListDelegate +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input, Output} + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpTuple +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.serializer.{SerializationFramework, Serializer} + +class StormSerializationFramework extends SerializationFramework { + private var stormConfig: JMap[AnyRef, AnyRef] = null + private var pool: ThreadLocal[Serializer] = null + + override def init(system: ExtendedActorSystem, config: UserConfig): Unit = { + implicit val actorSystem = system + stormConfig = config.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get + pool = new ThreadLocal[Serializer]() { + override def initialValue(): Serializer = { + val kryo = SerializationFactory.getKryo(stormConfig) + new StormSerializer(kryo) + } + } + } + + override def get(): Serializer = { + pool.get() + } +} + +/** + * serializes / deserializes [[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]] + * + * @param kryo created by Storm [[backtype.storm.serialization.SerializationFactory]] + */ +class StormSerializer(kryo: Kryo) extends Serializer { + // -1 means the max buffer size is 2147483647 + private val output = new Output(4096, -1) + private val input = new Input + + override def serialize(message: Any): Array[Byte] = { + val tuple = message.asInstanceOf[GearpumpTuple] + output.clear() + output.writeInt(tuple.sourceTaskId) + output.writeString(tuple.sourceStreamId) + val listDelegate = new ListDelegate + listDelegate.setDelegate(tuple.values) + kryo.writeObject(output, listDelegate) + output.toBytes + } + + override def deserialize(msg: Array[Byte]): Any = { + input.setBuffer(msg) + val sourceTaskId: JInteger = input.readInt + val sourceStreamId: String = input.readString + val listDelegate = kryo.readObject[ListDelegate](input, classOf[ListDelegate]) + new GearpumpTuple(listDelegate.getDelegate, sourceTaskId, sourceStreamId, null) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala new file mode 100644 index 0000000..40e36a6 --- /dev/null +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import java.lang.{Boolean => JBoolean} +import java.util.{HashMap => JHashMap, Map => JMap} + +import akka.actor.ActorSystem +import backtype.storm.Config +import backtype.storm.generated._ +import org.apache.storm.shade.org.json.simple.JSONValue + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} +import org.apache.gearpump.experiments.storm.topology._ +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.streaming.task.{TaskContext, TaskId} +import org.apache.gearpump.util.Util + +object StormUtil { + + /** + * Convert storm task id to gearpump [[org.apache.gearpump.streaming.task.TaskId]] + * + * The high 16 bit of an Int is TaskId.processorId + * The low 16 bit of an Int is TaskId.index + */ + def stormTaskIdToGearpump(id: Integer): TaskId = { + val index = id & 0xFFFF + val processorId = id >> 16 + TaskId(processorId, index) + } + + /** + * convert gearpump [[TaskId]] to storm task id + * TaskId.processorId is the high 16 bit of an Int + * TaskId.index is the low 16 bit of an Int + */ + def gearpumpTaskIdToStorm(taskId: TaskId): Integer = { + val index = taskId.index + val processorId = taskId.processorId + (processorId << 16) + (index & 0xFFFF) + } + + /** + * @return a configured [[GearpumpStormComponent]] + */ + def getGearpumpStormComponent( + taskContext: TaskContext, conf: UserConfig)(implicit system: ActorSystem) + : GearpumpStormComponent = { + val topology = conf.getValue[StormTopology](STORM_TOPOLOGY).get + val stormConfig = conf.getValue[JMap[AnyRef, AnyRef]](STORM_CONFIG).get + val componentId = conf.getString(STORM_COMPONENT).get + val spouts = topology.get_spouts + val bolts = topology.get_bolts + if (spouts.containsKey(componentId)) { + GearpumpSpout(topology, stormConfig, spouts.get(componentId), taskContext) + } else if (bolts.containsKey(componentId)) { + GearpumpBolt(topology, stormConfig, bolts.get(componentId), taskContext) + } else { + throw new Exception(s"storm component $componentId not found") + } + } + + /** + * Parses config in json to map, returns empty map for invalid json string + * + * @param json config in json + * @return config in map + */ + def parseJsonStringToMap(json: String): JMap[AnyRef, AnyRef] = { + Option(json).flatMap(json => JSONValue.parse(json) match { + case m: JMap[_, _] => Option(m.asInstanceOf[JMap[AnyRef, AnyRef]]) + case _ => None + }).getOrElse(new JHashMap[AnyRef, AnyRef]) + } + + /** + * get Int value of the config name + */ + def getInt(conf: JMap[_, _], name: String): Option[Int] = { + Option(conf.get(name)).map { + case number: Number => number.intValue + case invalid => throw new IllegalArgumentException( + s"$name must be Java Integer; actual: ${invalid.getClass}") + } + } + + /** + * get Boolean value of the config name + */ + def getBoolean(conf: JMap[_, _], name: AnyRef): Option[Boolean] = { + Option(conf.get(name)).map { + case b: JBoolean => b.booleanValue() + case invalid => throw new IllegalArgumentException( + s"$name must be a Java Boolean; acutal: ${invalid.getClass}") + } + } + + /** + * clojure mod ported from Storm + * see https://clojuredocs.org/clojure.core/mod + */ + def mod(num: Int, div: Int): Int = { + (num % div + div) % div + } + + def ackEnabled(config: JMap[AnyRef, AnyRef]): Boolean = { + if (config.containsKey(Config.TOPOLOGY_ACKER_EXECUTORS)) { + getInt(config, Config.TOPOLOGY_ACKER_EXECUTORS).map(_ != 0).getOrElse(true) + } else { + false + } + } + + def getThriftPort(): Int = { + Util.findFreePort().getOrElse( + throw new Exception("unable to find free port for thrift server")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala deleted file mode 100644 index ed4a6bb..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.partitioner - -import java.util.{List => JList} -import scala.collection.JavaConverters._ - -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.partitioner.Partitioner - -class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { - - property("StormPartitioner should get partitions directed by message and target") { - val idGen = Gen.chooseNum[Int](0, Int.MaxValue) - val componentsGen = Gen.listOf[String](Gen.alphaStr).map(_.distinct).suchThat(_.size > 1) - val partitionsGen = Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted.toArray) - val tupleFactoryGen = for { - values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) - sourceTaskId <- idGen - sourceStreamId <- Gen.alphaStr - } yield (targetPartitions: Map[String, Array[Int]]) => { - new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, targetPartitions) - } - - forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) { - (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, - components: List[String], partitions: Array[Int]) => { - val currentPartitionId = id - val targetPartitions = components.init.map(c => (c, partitions)).toMap - val tuple = tupleFactory(targetPartitions) - targetPartitions.foreach { - case (target, ps) => { - val partitioner = new StormPartitioner(target) - ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, currentPartitionId) - } - } - val partitionNum = id - val nonTarget = components.last - val partitioner = new StormPartitioner(nonTarget) - - partitioner.getPartitions(Message(tuple), partitionNum, - currentPartitionId) shouldBe List(Partitioner.UNKNOWN_PARTITION_ID) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala deleted file mode 100644 index 64cabd5..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.processor - -import java.util.{List => JList} -import scala.collection.JavaConverters._ - -import backtype.storm.tuple.Tuple -import backtype.storm.utils.Utils -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.experiments.storm.util.StormOutputCollector - -class StormBoltOutputCollectorSpec - extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - property("StormBoltOutputCollector should call StormOutputCollector") { - val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000)) - val valuesGen = Gen.listOf[AnyRef](valGen) - - forAll(valuesGen) { (values: List[AnyRef]) => - val collector = mock[StormOutputCollector] - val boltCollector = new StormBoltOutputCollector(collector) - val streamId = Utils.DEFAULT_STREAM_ID - boltCollector.emit(streamId, null, values.asJava) - verify(collector).emit(streamId, values.asJava) - } - } - - property("StormBoltOutputCollector should throw on fail") { - val collector = mock[StormOutputCollector] - val tuple = mock[Tuple] - val boltCollector = new StormBoltOutputCollector(collector) - an[Exception] should be thrownBy boltCollector.fail(tuple) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala deleted file mode 100644 index a3a8196..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/processor/StormProcessorSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.processor - -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar { - - "StormProcessor" should { - "start GearpumpSpout onStart" in { - val startTime = mock[StartTime] - val gearpumpBolt = mock[GearpumpBolt] - when(gearpumpBolt.getTickFrequency).thenReturn(None) - val taskContext = MockUtil.mockTaskContext - val userConfig = UserConfig.empty - val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, userConfig) - - stormProcessor.onStart(startTime) - - verify(gearpumpBolt).start(startTime) - } - - "pass message to GearpumpBolt onNext" in { - val message = mock[Message] - val gearpumpBolt = mock[GearpumpBolt] - val freq = 5 - when(gearpumpBolt.getTickFrequency).thenReturn(Some(freq)) - val taskContext = MockUtil.mockTaskContext - val userConfig = UserConfig.empty - val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, userConfig) - - stormProcessor.onNext(message) - - verify(gearpumpBolt).next(message) - - stormProcessor.onNext(StormProcessor.TICK) - - verify(gearpumpBolt).tick(freq) - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala deleted file mode 100644 index 8c10afc..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormProducerSpec.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.producer - -import akka.testkit.TestProbe -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.Message -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.StartTime - -class StormProducerSpec extends WordSpec with Matchers with MockitoSugar { - - "StormProducer" should { - "start GearpumpSpout onStart" in { - val startTime = mock[StartTime] - val gearpumpSpout = mock[GearpumpSpout] - when(gearpumpSpout.getMessageTimeout).thenReturn(None) - val taskContext = MockUtil.mockTaskContext - implicit val actorSystem = taskContext.system - val taskActor = TestProbe() - when(taskContext.self).thenReturn(taskActor.ref) - val userConfig = UserConfig.empty - val stormProducer = new StormProducer(gearpumpSpout, taskContext, userConfig) - - stormProducer.onStart(startTime) - - verify(gearpumpSpout).start(startTime) - taskActor.expectMsg(Message("start")) - } - - "pass message to GearpumpBolt onNext" in { - val message = mock[Message] - val gearpumpSpout = mock[GearpumpSpout] - val timeout = 5L - when(gearpumpSpout.getMessageTimeout).thenReturn(Some(timeout)) - val taskContext = MockUtil.mockTaskContext - implicit val actorSystem = taskContext.system - val taskActor = TestProbe() - when(taskContext.self).thenReturn(taskActor.ref) - val userConfig = UserConfig.empty - val stormProducer = new StormProducer(gearpumpSpout, taskContext, userConfig) - - stormProducer.onNext(message) - - verify(gearpumpSpout).next(message) - taskActor.expectMsg(Message("continue")) - - stormProducer.onNext(StormProducer.TIMEOUT) - verify(gearpumpSpout).timeout(timeout) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala deleted file mode 100644 index e638da9..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.producer - -import java.util.{List => JList} -import scala.collection.JavaConverters._ - -import backtype.storm.spout.ISpout -import backtype.storm.utils.Utils -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.experiments.storm.util.StormOutputCollector - -class StormSpoutOutputCollectorSpec - extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - property("StormSpoutOutputCollector should call StormOutputCollector") { - val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000)) - val valuesGen = Gen.listOf[AnyRef](valGen) - - forAll(valuesGen) { (values: List[AnyRef]) => - val collector = mock[StormOutputCollector] - val spout = mock[ISpout] - val streamId = Utils.DEFAULT_STREAM_ID - val spoutCollector = new StormSpoutOutputCollector(collector, spout, false) - spoutCollector.emit(streamId, values.asJava, null) - verify(collector).emit(streamId, values.asJava) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala deleted file mode 100644 index 319ee27..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.storm.topology - -import java.util.{Map => JMap} - -import akka.actor.ActorRef -import backtype.storm.spout.{ISpout, SpoutOutputCollector} -import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} -import backtype.storm.tuple.Tuple -import org.mockito.Matchers.{anyObject, eq => mockitoEq} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} -import io.gearpump.experiments.storm.util.StormOutputCollector -import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId} -import io.gearpump.streaming.{DAG, MockUtil} -import io.gearpump.{Message, TimeStamp} - -class GearpumpStormComponentSpec - extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - property("GearpumpSpout lifecycle") { - val config = mock[JMap[AnyRef, AnyRef]] - val spout = mock[ISpout] - val taskContext = MockUtil.mockTaskContext - val appMaster = mock[ActorRef] - when(taskContext.appMaster).thenReturn(appMaster) - val getDAG = mock[ActorRef => DAG] - val dag = mock[DAG] - when(getDAG(appMaster)).thenReturn(dag) - val getTopologyContext = mock[(DAG, TaskId) => TopologyContext] - val topologyContext = mock[TopologyContext] - when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext) - val getOutputCollector = mock[(TaskContext, TopologyContext) => StormSpoutOutputCollector] - val outputCollector = mock[StormSpoutOutputCollector] - when(getOutputCollector(taskContext, topologyContext)).thenReturn(outputCollector) - - val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext, - getOutputCollector, ackEnabled = false, taskContext) - - // Start - val startTime = mock[StartTime] - gearpumpSpout.start(startTime) - - verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), - anyObject[SpoutOutputCollector]) - - // Next - val message = mock[Message] - gearpumpSpout.next(message) - - verify(spout).nextTuple() - } - - property("GearpumpBolt lifecycle") { - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - val freqGen = Gen.chooseNum[Int](1, 100) - forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) => - val config = mock[JMap[AnyRef, AnyRef]] - val bolt = mock[IBolt] - val taskContext = MockUtil.mockTaskContext - val appMaster = mock[ActorRef] - when(taskContext.appMaster).thenReturn(appMaster) - val getDAG = mock[ActorRef => DAG] - val dag = mock[DAG] - when(getDAG(appMaster)).thenReturn(dag) - val getTopologyContext = mock[(DAG, TaskId) => TopologyContext] - val topologyContext = mock[TopologyContext] - when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext) - val getGeneralTopologyContext = mock[DAG => GeneralTopologyContext] - val generalTopologyContext = mock[GeneralTopologyContext] - when(getGeneralTopologyContext(dag)).thenReturn(generalTopologyContext) - val getOutputCollector = mock[(TaskContext, TopologyContext) => StormOutputCollector] - val stormOutputCollector = mock[StormOutputCollector] - when(getOutputCollector(taskContext, topologyContext)).thenReturn(stormOutputCollector) - val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple] - val tickTuple = mock[Tuple] - when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple) - val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext, - getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext) - - // Start - val startTime = mock[StartTime] - gearpumpBolt.start(startTime) - - verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), - anyObject[OutputCollector]) - - // Next - val gearpumpTuple = mock[GearpumpTuple] - val tuple = mock[Tuple] - when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple) - val message = Message(gearpumpTuple, timestamp) - gearpumpBolt.next(message) - - verify(stormOutputCollector).setTimestamp(timestamp) - verify(bolt).execute(tuple) - - // Tick - gearpumpBolt.tick(freq) - verify(bolt).execute(tickTuple) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala deleted file mode 100644 index f9ffc5f..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.topology - -import java.util.{HashMap => JHashMap, Map => JMap} -import scala.collection.JavaConverters._ - -import backtype.storm.Config -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.experiments.storm.processor.StormProcessor -import io.gearpump.experiments.storm.producer.StormProducer -import io.gearpump.experiments.storm.util.TopologyUtil -import io.gearpump.streaming.MockUtil - -class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar { - import io.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._ - - "GearpumpStormTopology" should { - "merge configs with defined priority" in { - val stormTopology = TopologyUtil.getTestTopology - val name = "name" - val sysVal = "sys" - val sysConfig = newJavaConfig(name, sysVal) - val appVal = "app" - val appConfig = newJavaConfig(name, appVal) - - implicit val system = MockUtil.system - val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, - newEmptyConfig) - topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1" - topology1.getStormConfig should not contain name - - val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, - newEmptyConfig) - topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2" - topology2.getStormConfig.get(name) shouldBe sysVal - - val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig) - topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3" - topology3.getStormConfig.get(name) shouldBe appVal - } - - "create Gearpump processors from Storm topology" in { - val stormTopology = TopologyUtil.getTestTopology - implicit val system = MockUtil.system - val gearpumpStormTopology = - GearpumpStormTopology("app", stormTopology, null) - val processors = gearpumpStormTopology.getProcessors - stormTopology.get_spouts().asScala.foreach { case (spoutId, _) => - val processor = processors(spoutId) - processor.taskClass shouldBe classOf[StormProducer] - processor.description shouldBe spoutId - } - stormTopology.get_bolts().asScala.foreach { case (boltId, _) => - val processor = processors(boltId) - processor.taskClass shouldBe classOf[StormProcessor] - processor.description shouldBe boltId - } - } - - "get target processors from source id" in { - val stormTopology = TopologyUtil.getTestTopology - implicit val system = MockUtil.system - val sysConfig = new JHashMap[AnyRef, AnyRef] - val gearpumpStormTopology = - GearpumpStormTopology("app", stormTopology, null) - val targets0 = gearpumpStormTopology.getTargets("1") - targets0 should contain key "3" - targets0 should contain key "4" - val targets1 = gearpumpStormTopology.getTargets("2") - targets1 should contain key "3" - } - } -} - -object GearpumpStormTopologySpec { - def newEmptyConfig: JMap[AnyRef, AnyRef] = { - new JHashMap[AnyRef, AnyRef] - } - - def newJavaConfig(key: AnyRef, value: AnyRef): JMap[AnyRef, AnyRef] = { - val config = new JHashMap[AnyRef, AnyRef] - config.put(key, value) - config - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala deleted file mode 100644 index f454145..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.storm.topology - -import java.util.{List => JList} -import scala.collection.JavaConverters._ - -import backtype.storm.task.GeneralTopologyContext -import backtype.storm.tuple.Fields -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.TimeStamp - -class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - property("GearpumpTuple should create Storm Tuple") { - val tupleGen = for { - values <- Gen.listOf[String](Gen.alphaStr).map(_.distinct.asJava.asInstanceOf[JList[AnyRef]]) - sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue) - sourceStreamId <- Gen.alphaStr - } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null) - - forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) { - (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) => - val topologyContext = mock[GeneralTopologyContext] - val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*) - when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId) - when(topologyContext.getComponentOutputFields( - componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields) - - val tuple = gearpumpTuple.toTuple(topologyContext, timestamp) - - tuple shouldBe a[TimedTuple] - val timedTuple = tuple.asInstanceOf[TimedTuple] - timedTuple.getValues shouldBe gearpumpTuple.values - timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId - timedTuple.getSourceComponent shouldBe componentId - timedTuple.getSourceStreamId shouldBe gearpumpTuple.sourceStreamId - timedTuple.getMessageId shouldBe null - timedTuple.getFields shouldBe fields - timedTuple.timestamp shouldBe timestamp - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala deleted file mode 100644 index 27c02fe..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} - -import io.gearpump.experiments.storm.partitioner.StormPartitioner -import io.gearpump.experiments.storm.topology.GearpumpStormTopology -import io.gearpump.streaming.Processor -import io.gearpump.streaming.task.Task - -class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar { - - "GraphBuilder" should { - "build Graph from Storm topology" in { - val topology = mock[GearpumpStormTopology] - - val sourceId = "source" - val sourceProcessor = mock[Processor[Task]] - val targetId = "target" - val targetProcessor = mock[Processor[Task]] - - when(topology.getProcessors).thenReturn( - Map(sourceId -> sourceProcessor, targetId -> targetProcessor)) - when(topology.getTargets(sourceId)).thenReturn(Map(targetId -> targetProcessor)) - when(topology.getTargets(targetId)).thenReturn(Map.empty[String, Processor[Task]]) - - val graph = GraphBuilder.build(topology) - - graph.edges.size shouldBe 1 - val (from, edge, to) = graph.edges.head - from shouldBe sourceProcessor - edge shouldBe a[StormPartitioner] - to shouldBe targetProcessor - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala deleted file mode 100644 index 599ea8d..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import java.util.{List => JList} -import scala.collection.JavaConverters._ - -import backtype.storm.generated.GlobalStreamId -import backtype.storm.grouping.CustomStreamGrouping -import backtype.storm.task.TopologyContext -import backtype.storm.tuple.Fields -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.experiments.storm.util.GrouperSpec.Value - -class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val taskIdGen = Gen.chooseNum[Int](0, 1000) - val valuesGen = Gen.listOf[String](Gen.alphaStr) - .map(_.asJava.asInstanceOf[JList[AnyRef]]) - val numTasksGen = Gen.chooseNum[Int](1, 1000) - - property("GlobalGrouper should always return partition 0") { - forAll(taskIdGen, valuesGen) { (taskId: Int, values: JList[AnyRef]) => - val grouper = new GlobalGrouper - grouper.getPartitions(taskId, values) shouldBe List(0) - } - } - - property("NoneGrouper should returns partition in the range [0, numTasks)") { - forAll(taskIdGen, valuesGen, numTasksGen) { - (taskId: Int, values: JList[AnyRef], numTasks: Int) => - val grouper = new NoneGrouper(numTasks) - val partitions = grouper.getPartitions(taskId, values) - partitions.size shouldBe 1 - partitions.head should (be >= 0 and be < numTasks) - } - } - - property("ShuffleGrouper should return partition in the range [0, numTasks)") { - forAll(taskIdGen, valuesGen, numTasksGen) { - (taskId: Int, values: JList[AnyRef], numTasks: Int) => - val grouper = new ShuffleGrouper(numTasks) - val partitions = grouper.getPartitions(taskId, values) - partitions.size shouldBe 1 - partitions.head should (be >= 0 and be < numTasks) - } - } - - property("FieldsGrouper should return partition according to fields") { - forAll(taskIdGen, numTasksGen) { - (taskId: Int, numTasks: Int) => - val values = 0.until(numTasks).map(i => new Value(i)) - val fields = values.map(_.toString) - val outFields = new Fields(fields: _*) - values.flatMap { v => - val groupFields = new Fields(v.toString) - val grouper = new FieldsGrouper(outFields, groupFields, numTasks) - grouper.getPartitions(taskId, - values.toList.asJava.asInstanceOf[JList[AnyRef]]) - }.distinct.size shouldBe numTasks - } - } - - property("AllGrouper should return all partitions") { - forAll(taskIdGen, numTasksGen, valuesGen) { - (taskId: Int, numTasks: Int, values: JList[AnyRef]) => - val grouper = new AllGrouper(numTasks) - val partitions = grouper.getPartitions(taskId, values) - partitions.distinct.size shouldBe numTasks - partitions.min shouldBe 0 - partitions.max shouldBe (numTasks - 1) - } - } - - property("CustomGrouper should return partitions specified by user") { - val grouping = mock[CustomStreamGrouping] - val grouper = new CustomGrouper(grouping) - val topologyContext = mock[TopologyContext] - val globalStreamId = mock[GlobalStreamId] - val sourceTasks = mock[JList[Integer]] - - grouper.prepare(topologyContext, globalStreamId, sourceTasks) - - verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks) - - forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) => - 0.until(taskNum).foreach { i => - when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava) - grouper.getPartitions(taskId, values) shouldBe List(i) - } - } - } -} - -object GrouperSpec { - class Value(val i: Int) extends AnyRef { - - override def toString: String = s"$i" - - override def hashCode(): Int = i - - override def equals(other: Any): Boolean = { - if (other.isInstanceOf[Value]) { - this.i == other.asInstanceOf[Value].i - } else { - false - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala deleted file mode 100644 index 1a28694..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.storm.util - -import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConverters._ - -import backtype.storm.generated.Grouping -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump._ -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.streaming.MockUtil - -class StormOutputCollectorSpec - extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val stormTaskId = 0 - val streamIdGen = Gen.alphaStr - val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) - val timestampGen = Gen.chooseNum[Long](0L, 1000L) - - property("StormOutputCollector emits tuple values into a stream") { - forAll(timestampGen, streamIdGen, valuesGen) { - (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) => - val targets = mock[JMap[String, JMap[String, Grouping]]] - val taskToComponent = mock[JMap[Integer, String]] - val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => - (Map[String, Array[Int]], JList[Integer])] - val targetPartitions = mock[Map[String, Array[Int]]] - val targetStormTaskIds = mock[JList[Integer]] - when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, - targetStormTaskIds)) - val taskContext = MockUtil.mockTaskContext - val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, LatestTime) - - when(targets.containsKey(streamId)).thenReturn(false) - stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST - verify(taskContext, times(0)).output(anyObject[Message]) - - when(targets.containsKey(streamId)).thenReturn(true) - stormOutputCollector.setTimestamp(timestamp) - stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds - verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ - case Message(tuple: GearpumpTuple, t) => - val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - tuple == expected && t == timestamp - })) - } - } - - property("StormOutputCollector emit direct to a task") { - val idGen = Gen.chooseNum[Int](0, 1000) - val targetGen = Gen.alphaStr - forAll(idGen, targetGen, timestampGen, streamIdGen, valuesGen) { - (id: Int, target: String, timestamp: Long, streamId: String, values: JList[AnyRef]) => - val targets = mock[JMap[String, JMap[String, Grouping]]] - val taskToComponent = mock[JMap[Integer, String]] - when(taskToComponent.get(id)).thenReturn(target) - val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => - (Map[String, Array[Int]], JList[Integer])] - val targetPartitions = mock[Map[String, Array[Int]]] - val targetStormTaskIds = mock[JList[Integer]] - when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, - targetStormTaskIds)) - val taskContext = MockUtil.mockTaskContext - val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, - targets, getTargetPartitionsFn, taskContext, LatestTime) - - when(targets.containsKey(streamId)).thenReturn(false) - verify(taskContext, times(0)).output(anyObject[Message]) - - when(targets.containsKey(streamId)).thenReturn(true) - stormOutputCollector.setTimestamp(timestamp) - stormOutputCollector.emitDirect(id, streamId, values) - val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index) - verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ - case Message(tuple: GearpumpTuple, t) => { - val expected = new GearpumpTuple(values, stormTaskId, streamId, - Map(target -> partitions)) - - val result = tuple == expected && t == timestamp - result - } - })) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala deleted file mode 100644 index d8a29e8..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import java.util.{HashMap => JHashMap, List => JList, Map => JMap} -import scala.collection.JavaConverters._ - -import akka.actor.ExtendedActorSystem -import backtype.storm.utils.Utils -import com.esotericsoftware.kryo.Kryo -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.streaming.MockUtil - -class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - property("StormSerializerPool should create and manage StormSerializer") { - val taskContext = MockUtil.mockTaskContext - val serializerPool = new StormSerializationFramework - val system = taskContext.system.asInstanceOf[ExtendedActorSystem] - implicit val actorSystem = system - val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]] - val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig) - serializerPool.init(system, config) - serializerPool.get shouldBe a[StormSerializer] - } - - property("StormSerializer should serialize and deserialize GearpumpTuple") { - val tupleGen = for { - values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) - sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue) - sourceStreamId <- Gen.alphaStr - } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null) - - val kryo = new Kryo - forAll(tupleGen) { (tuple: GearpumpTuple) => - val serializer = new StormSerializer(kryo) - serializer.deserialize(serializer.serialize(tuple)) shouldBe tuple - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala deleted file mode 100644 index a9b93c9..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import java.lang.{Boolean => JBoolean, Long => JLong} -import java.util.{HashMap => JHashMap, Map => JMap} -import scala.collection.JavaConverters._ - -import backtype.storm.Config -import backtype.storm.generated.StormTopology -import org.apache.storm.shade.org.json.simple.JSONValue -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.experiments.storm.util.StormUtil._ -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.TaskId - -class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - property("convert Storm task ids to gearpump TaskIds and back") { - val idGen = Gen.chooseNum[Int](0, Int.MaxValue) - forAll(idGen) { (stormTaskId: Int) => - gearpumpTaskIdToStorm(stormTaskIdToGearpump(stormTaskId)) shouldBe stormTaskId - } - - val processorIdGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16) - val indexGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16) - forAll(processorIdGen, indexGen) { (processorId: Int, index: Int) => - val taskId = TaskId(processorId, index) - stormTaskIdToGearpump(gearpumpTaskIdToStorm(taskId)) shouldBe taskId - } - } - - property("get GearpumpStormComponent from user config") { - val taskContext = MockUtil.mockTaskContext - val topology = TopologyUtil.getTestTopology - implicit val actorSystem = taskContext.system - val userConfig = UserConfig.empty - .withValue[StormTopology](STORM_TOPOLOGY, topology) - .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef]) - topology.get_spouts.asScala.foreach { case (spoutId, _) => - val config = userConfig.withString(STORM_COMPONENT, spoutId) - val component = getGearpumpStormComponent(taskContext, config)(taskContext.system) - component shouldBe a[GearpumpSpout] - } - topology.get_bolts.asScala.foreach { case (boltId, _) => - val config = userConfig.withString(STORM_COMPONENT, boltId) - val component = getGearpumpStormComponent(taskContext, config)(taskContext.system) - component shouldBe a[GearpumpBolt] - } - } - - property("parse json to map") { - val mapGen = Gen.listOf[String](Gen.alphaStr) - .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]]) - - forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) => - parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map - } - - val invalidJsonGen: Gen[String] = Gen.oneOf(null, "", "1") - forAll(invalidJsonGen) { (invalidJson: String) => - val map = parseJsonStringToMap(invalidJson) - map shouldBe empty - map shouldBe a[JMap[_, _]] - } - } - - property("get int from config") { - val name = "int" - val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - getInt(conf, name) shouldBe None - conf.put(name, null) - getInt(conf, name) shouldBe None - - forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { (int: Int) => - conf.put(name, new Integer(int)) - getInt(conf, name) shouldBe Some(int) - } - - forAll(Gen.chooseNum[Long](Int.MinValue, Int.MaxValue)) { (long: Long) => - conf.put(name, new JLong(long)) - getInt(conf, name) shouldBe Some(long) - } - - forAll(Gen.alphaStr) { (s: String) => - conf.put(name, s) - an[IllegalArgumentException] should be thrownBy getInt(conf, name) - } - } - - property("get boolean from config") { - val name = "boolean" - val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - getBoolean(conf, name) shouldBe None - conf.put(name, null) - getBoolean(conf, name) shouldBe None - - forAll(Gen.oneOf(true, false)) { (boolean: Boolean) => - conf.put(name, new JBoolean(boolean)) - getBoolean(conf, name) shouldBe Some(boolean) - } - - forAll(Gen.alphaStr) { (s: String) => - conf.put(name, s) - an[IllegalArgumentException] should be thrownBy getBoolean(conf, name) - } - } - - property("mod should be correct") { - mod(10, 5) shouldBe 0 - mod(10, 6) shouldBe 4 - mod(10, -3) shouldBe -2 - mod(-2, 5) shouldBe 3 - mod(-1, -2) shouldBe -1 - } - - property("get whether ack enabled") { - val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - ackEnabled(conf) shouldBe false - conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(0)) - ackEnabled(conf) shouldBe false - conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, null) - ackEnabled(conf) shouldBe true - forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { - (ackers: Int) => - conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(ackers)) - if (ackers == 0) { - ackEnabled(conf) shouldBe false - } else { - ackEnabled(conf) shouldBe true - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala deleted file mode 100644 index 8491786..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.util - -import backtype.storm.generated.StormTopology -import backtype.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout} -import backtype.storm.topology.TopologyBuilder -import backtype.storm.tuple.Fields -import backtype.storm.utils.Utils - -object TopologyUtil { - val DEFAULT_STREAM_ID = Utils.DEFAULT_STREAM_ID - val DEFAULT_COMPONENT_ID = "component" - - def getTestTopology: StormTopology = { - val topologyBuilder = new TopologyBuilder - topologyBuilder.setSpout("1", new TestWordSpout(true), 5) - topologyBuilder.setSpout("2", new TestWordSpout(true), 3) - topologyBuilder.setBolt("3", new TestWordCounter(), 3) - .fieldsGrouping("1", new Fields("word")) - .fieldsGrouping("2", new Fields("word")) - topologyBuilder.setBolt("4", new TestGlobalCount()).globalGrouping("1") - topologyBuilder.createTopology() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala new file mode 100644 index 0000000..5513423 --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.partitioner + +import java.util.{List => JList} +import scala.collection.JavaConverters._ + +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.experiments.storm.topology.GearpumpTuple +import org.apache.gearpump.partitioner.Partitioner + +class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { + + property("StormPartitioner should get partitions directed by message and target") { + val idGen = Gen.chooseNum[Int](0, Int.MaxValue) + val componentsGen = Gen.listOf[String](Gen.alphaStr).map(_.distinct).suchThat(_.size > 1) + val partitionsGen = Gen.listOf[Int](idGen).suchThat(_.nonEmpty).map(_.distinct.sorted.toArray) + val tupleFactoryGen = for { + values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) + sourceTaskId <- idGen + sourceStreamId <- Gen.alphaStr + } yield (targetPartitions: Map[String, Array[Int]]) => { + new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, targetPartitions) + } + + forAll(tupleFactoryGen, idGen, componentsGen, partitionsGen) { + (tupleFactory: Map[String, Array[Int]] => GearpumpTuple, id: Int, + components: List[String], partitions: Array[Int]) => { + val currentPartitionId = id + val targetPartitions = components.init.map(c => (c, partitions)).toMap + val tuple = tupleFactory(targetPartitions) + targetPartitions.foreach { + case (target, ps) => { + val partitioner = new StormPartitioner(target) + ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, currentPartitionId) + } + } + val partitionNum = id + val nonTarget = components.last + val partitioner = new StormPartitioner(nonTarget) + + partitioner.getPartitions(Message(tuple), partitionNum, + currentPartitionId) shouldBe List(Partitioner.UNKNOWN_PARTITION_ID) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala new file mode 100644 index 0000000..430b1c0 --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.processor + +import java.util.{List => JList} + +import backtype.storm.tuple.Tuple +import backtype.storm.utils.Utils +import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import scala.collection.JavaConverters._ + +class StormBoltOutputCollectorSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("StormBoltOutputCollector should call StormOutputCollector") { + val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000)) + val valuesGen = Gen.listOf[AnyRef](valGen) + + forAll(valuesGen) { (values: List[AnyRef]) => + val collector = mock[StormOutputCollector] + val boltCollector = new StormBoltOutputCollector(collector) + val streamId = Utils.DEFAULT_STREAM_ID + boltCollector.emit(streamId, null, values.asJava) + verify(collector).emit(streamId, values.asJava) + } + } + + property("StormBoltOutputCollector should throw on fail") { + val collector = mock[StormOutputCollector] + val tuple = mock[Tuple] + val boltCollector = new StormBoltOutputCollector(collector) + an[Exception] should be thrownBy boltCollector.fail(tuple) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala new file mode 100644 index 0000000..2111df6 --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormProcessorSpec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.processor + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + +class StormProcessorSpec extends WordSpec with Matchers with MockitoSugar { + + "StormProcessor" should { + "start GearpumpSpout onStart" in { + val startTime = mock[StartTime] + val gearpumpBolt = mock[GearpumpBolt] + when(gearpumpBolt.getTickFrequency).thenReturn(None) + val taskContext = MockUtil.mockTaskContext + val userConfig = UserConfig.empty + val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, userConfig) + + stormProcessor.onStart(startTime) + + verify(gearpumpBolt).start(startTime) + } + + "pass message to GearpumpBolt onNext" in { + val message = mock[Message] + val gearpumpBolt = mock[GearpumpBolt] + val freq = 5 + when(gearpumpBolt.getTickFrequency).thenReturn(Some(freq)) + val taskContext = MockUtil.mockTaskContext + val userConfig = UserConfig.empty + val stormProcessor = new StormProcessor(gearpumpBolt, taskContext, userConfig) + + stormProcessor.onNext(message) + + verify(gearpumpBolt).next(message) + + stormProcessor.onNext(StormProcessor.TICK) + + verify(gearpumpBolt).tick(freq) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala new file mode 100644 index 0000000..39a008f --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.producer + +import akka.testkit.TestProbe +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.StartTime +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + +class StormProducerSpec extends WordSpec with Matchers with MockitoSugar { + + "StormProducer" should { + "start GearpumpSpout onStart" in { + val startTime = mock[StartTime] + val gearpumpSpout = mock[GearpumpSpout] + when(gearpumpSpout.getMessageTimeout).thenReturn(None) + val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = taskContext.system + val taskActor = TestProbe() + when(taskContext.self).thenReturn(taskActor.ref) + val userConfig = UserConfig.empty + val stormProducer = new StormProducer(gearpumpSpout, taskContext, userConfig) + + stormProducer.onStart(startTime) + + verify(gearpumpSpout).start(startTime) + taskActor.expectMsg(Message("start")) + } + + "pass message to GearpumpBolt onNext" in { + val message = mock[Message] + val gearpumpSpout = mock[GearpumpSpout] + val timeout = 5L + when(gearpumpSpout.getMessageTimeout).thenReturn(Some(timeout)) + val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = taskContext.system + val taskActor = TestProbe() + when(taskContext.self).thenReturn(taskActor.ref) + val userConfig = UserConfig.empty + val stormProducer = new StormProducer(gearpumpSpout, taskContext, userConfig) + + stormProducer.onNext(message) + + verify(gearpumpSpout).next(message) + taskActor.expectMsg(Message("continue")) + + stormProducer.onNext(StormProducer.TIMEOUT) + verify(gearpumpSpout).timeout(timeout) + } + } +}
