http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala new file mode 100644 index 0000000..49afe05 --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.producer + +import java.util.{List => JList} + +import backtype.storm.spout.ISpout +import backtype.storm.utils.Utils +import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import scala.collection.JavaConverters._ + +class StormSpoutOutputCollectorSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("StormSpoutOutputCollector should call StormOutputCollector") { + val valGen = Gen.oneOf(Gen.alphaStr, Gen.alphaChar, Gen.chooseNum[Int](0, 1000)) + val valuesGen = Gen.listOf[AnyRef](valGen) + + forAll(valuesGen) { (values: List[AnyRef]) => + val collector = mock[StormOutputCollector] + val spout = mock[ISpout] + val streamId = Utils.DEFAULT_STREAM_ID + val spoutCollector = new StormSpoutOutputCollector(collector, spout, false) + spoutCollector.emit(streamId, values.asJava, null) + verify(collector).emit(streamId, values.asJava) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala new file mode 100644 index 0000000..bdea50c --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.storm.topology + +import java.util.{Map => JMap} + +import akka.actor.ActorRef +import backtype.storm.spout.{ISpout, SpoutOutputCollector} +import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} +import backtype.storm.tuple.Tuple +import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} +import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId} +import org.apache.gearpump.streaming.{DAG, MockUtil} +import org.apache.gearpump.{Message, TimeStamp} +import org.mockito.Matchers.{anyObject, eq => mockitoEq} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class GearpumpStormComponentSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("GearpumpSpout lifecycle") { + val config = mock[JMap[AnyRef, AnyRef]] + val spout = mock[ISpout] + val taskContext = MockUtil.mockTaskContext + val appMaster = mock[ActorRef] + when(taskContext.appMaster).thenReturn(appMaster) + val getDAG = mock[ActorRef => DAG] + val dag = mock[DAG] + when(getDAG(appMaster)).thenReturn(dag) + val getTopologyContext = mock[(DAG, TaskId) => TopologyContext] + val topologyContext = mock[TopologyContext] + when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext) + val getOutputCollector = mock[(TaskContext, TopologyContext) => StormSpoutOutputCollector] + val outputCollector = mock[StormSpoutOutputCollector] + when(getOutputCollector(taskContext, topologyContext)).thenReturn(outputCollector) + + val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext, + getOutputCollector, ackEnabled = false, taskContext) + + // Start + val startTime = mock[StartTime] + gearpumpSpout.start(startTime) + + verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), + anyObject[SpoutOutputCollector]) + + // Next + val message = mock[Message] + gearpumpSpout.next(message) + + verify(spout).nextTuple() + } + + property("GearpumpBolt lifecycle") { + val timestampGen = Gen.chooseNum[Long](0L, 1000L) + val freqGen = Gen.chooseNum[Int](1, 100) + forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) => + val config = mock[JMap[AnyRef, AnyRef]] + val bolt = mock[IBolt] + val taskContext = MockUtil.mockTaskContext + val appMaster = mock[ActorRef] + when(taskContext.appMaster).thenReturn(appMaster) + val getDAG = mock[ActorRef => DAG] + val dag = mock[DAG] + when(getDAG(appMaster)).thenReturn(dag) + val getTopologyContext = mock[(DAG, TaskId) => TopologyContext] + val topologyContext = mock[TopologyContext] + when(getTopologyContext(dag, taskContext.taskId)).thenReturn(topologyContext) + val getGeneralTopologyContext = mock[DAG => GeneralTopologyContext] + val generalTopologyContext = mock[GeneralTopologyContext] + when(getGeneralTopologyContext(dag)).thenReturn(generalTopologyContext) + val getOutputCollector = mock[(TaskContext, TopologyContext) => StormOutputCollector] + val stormOutputCollector = mock[StormOutputCollector] + when(getOutputCollector(taskContext, topologyContext)).thenReturn(stormOutputCollector) + val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple] + val tickTuple = mock[Tuple] + when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple) + val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext, + getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext) + + // Start + val startTime = mock[StartTime] + gearpumpBolt.start(startTime) + + verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), + anyObject[OutputCollector]) + + // Next + val gearpumpTuple = mock[GearpumpTuple] + val tuple = mock[Tuple] + when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple) + val message = Message(gearpumpTuple, timestamp) + gearpumpBolt.next(message) + + verify(stormOutputCollector).setTimestamp(timestamp) + verify(bolt).execute(tuple) + + // Tick + gearpumpBolt.tick(freq) + verify(bolt).execute(tickTuple) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala new file mode 100644 index 0000000..ef383ad --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.topology + +import java.util.{HashMap => JHashMap, Map => JMap} + +import backtype.storm.Config +import org.apache.gearpump.experiments.storm.processor.StormProcessor +import org.apache.gearpump.experiments.storm.producer.StormProducer +import org.apache.gearpump.experiments.storm.util.TopologyUtil +import org.apache.gearpump.streaming.MockUtil +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + +import scala.collection.JavaConverters._ + +class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar { + import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._ + + "GearpumpStormTopology" should { + "merge configs with defined priority" in { + val stormTopology = TopologyUtil.getTestTopology + val name = "name" + val sysVal = "sys" + val sysConfig = newJavaConfig(name, sysVal) + val appVal = "app" + val appConfig = newJavaConfig(name, appVal) + + implicit val system = MockUtil.system + val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, + newEmptyConfig) + topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1" + topology1.getStormConfig should not contain name + + val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, + newEmptyConfig) + topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2" + topology2.getStormConfig.get(name) shouldBe sysVal + + val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig) + topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3" + topology3.getStormConfig.get(name) shouldBe appVal + } + + "create Gearpump processors from Storm topology" in { + val stormTopology = TopologyUtil.getTestTopology + implicit val system = MockUtil.system + val gearpumpStormTopology = + GearpumpStormTopology("app", stormTopology, null) + val processors = gearpumpStormTopology.getProcessors + stormTopology.get_spouts().asScala.foreach { case (spoutId, _) => + val processor = processors(spoutId) + processor.taskClass shouldBe classOf[StormProducer] + processor.description shouldBe spoutId + } + stormTopology.get_bolts().asScala.foreach { case (boltId, _) => + val processor = processors(boltId) + processor.taskClass shouldBe classOf[StormProcessor] + processor.description shouldBe boltId + } + } + + "get target processors from source id" in { + val stormTopology = TopologyUtil.getTestTopology + implicit val system = MockUtil.system + val sysConfig = new JHashMap[AnyRef, AnyRef] + val gearpumpStormTopology = + GearpumpStormTopology("app", stormTopology, null) + val targets0 = gearpumpStormTopology.getTargets("1") + targets0 should contain key "3" + targets0 should contain key "4" + val targets1 = gearpumpStormTopology.getTargets("2") + targets1 should contain key "3" + } + } +} + +object GearpumpStormTopologySpec { + def newEmptyConfig: JMap[AnyRef, AnyRef] = { + new JHashMap[AnyRef, AnyRef] + } + + def newJavaConfig(key: AnyRef, value: AnyRef): JMap[AnyRef, AnyRef] = { + val config = new JHashMap[AnyRef, AnyRef] + config.put(key, value) + config + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala new file mode 100644 index 0000000..f12e54f --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.storm.topology + +import java.util.{List => JList} + +import backtype.storm.task.GeneralTopologyContext +import backtype.storm.tuple.Fields +import org.apache.gearpump.TimeStamp +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import scala.collection.JavaConverters._ + +class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("GearpumpTuple should create Storm Tuple") { + val tupleGen = for { + values <- Gen.listOf[String](Gen.alphaStr).map(_.distinct.asJava.asInstanceOf[JList[AnyRef]]) + sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue) + sourceStreamId <- Gen.alphaStr + } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null) + + forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) { + (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) => + val topologyContext = mock[GeneralTopologyContext] + val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*) + when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId) + when(topologyContext.getComponentOutputFields( + componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields) + + val tuple = gearpumpTuple.toTuple(topologyContext, timestamp) + + tuple shouldBe a[TimedTuple] + val timedTuple = tuple.asInstanceOf[TimedTuple] + timedTuple.getValues shouldBe gearpumpTuple.values + timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId + timedTuple.getSourceComponent shouldBe componentId + timedTuple.getSourceStreamId shouldBe gearpumpTuple.sourceStreamId + timedTuple.getMessageId shouldBe null + timedTuple.getFields shouldBe fields + timedTuple.timestamp shouldBe timestamp + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala new file mode 100644 index 0000000..9cf5009 --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + +import org.apache.gearpump.experiments.storm.partitioner.StormPartitioner +import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.task.Task + +class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar { + + "GraphBuilder" should { + "build Graph from Storm topology" in { + val topology = mock[GearpumpStormTopology] + + val sourceId = "source" + val sourceProcessor = mock[Processor[Task]] + val targetId = "target" + val targetProcessor = mock[Processor[Task]] + + when(topology.getProcessors).thenReturn( + Map(sourceId -> sourceProcessor, targetId -> targetProcessor)) + when(topology.getTargets(sourceId)).thenReturn(Map(targetId -> targetProcessor)) + when(topology.getTargets(targetId)).thenReturn(Map.empty[String, Processor[Task]]) + + val graph = GraphBuilder.build(topology) + + graph.edges.size shouldBe 1 + val (from, edge, to) = graph.edges.head + from shouldBe sourceProcessor + edge shouldBe a[StormPartitioner] + to shouldBe targetProcessor + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala new file mode 100644 index 0000000..c1cdb3b --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import java.util.{List => JList} +import scala.collection.JavaConverters._ + +import backtype.storm.generated.GlobalStreamId +import backtype.storm.grouping.CustomStreamGrouping +import backtype.storm.task.TopologyContext +import backtype.storm.tuple.Fields +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.experiments.storm.util.GrouperSpec.Value + +class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val taskIdGen = Gen.chooseNum[Int](0, 1000) + val valuesGen = Gen.listOf[String](Gen.alphaStr) + .map(_.asJava.asInstanceOf[JList[AnyRef]]) + val numTasksGen = Gen.chooseNum[Int](1, 1000) + + property("GlobalGrouper should always return partition 0") { + forAll(taskIdGen, valuesGen) { (taskId: Int, values: JList[AnyRef]) => + val grouper = new GlobalGrouper + grouper.getPartitions(taskId, values) shouldBe List(0) + } + } + + property("NoneGrouper should returns partition in the range [0, numTasks)") { + forAll(taskIdGen, valuesGen, numTasksGen) { + (taskId: Int, values: JList[AnyRef], numTasks: Int) => + val grouper = new NoneGrouper(numTasks) + val partitions = grouper.getPartitions(taskId, values) + partitions.size shouldBe 1 + partitions.head should (be >= 0 and be < numTasks) + } + } + + property("ShuffleGrouper should return partition in the range [0, numTasks)") { + forAll(taskIdGen, valuesGen, numTasksGen) { + (taskId: Int, values: JList[AnyRef], numTasks: Int) => + val grouper = new ShuffleGrouper(numTasks) + val partitions = grouper.getPartitions(taskId, values) + partitions.size shouldBe 1 + partitions.head should (be >= 0 and be < numTasks) + } + } + + property("FieldsGrouper should return partition according to fields") { + forAll(taskIdGen, numTasksGen) { + (taskId: Int, numTasks: Int) => + val values = 0.until(numTasks).map(i => new Value(i)) + val fields = values.map(_.toString) + val outFields = new Fields(fields: _*) + values.flatMap { v => + val groupFields = new Fields(v.toString) + val grouper = new FieldsGrouper(outFields, groupFields, numTasks) + grouper.getPartitions(taskId, + values.toList.asJava.asInstanceOf[JList[AnyRef]]) + }.distinct.size shouldBe numTasks + } + } + + property("AllGrouper should return all partitions") { + forAll(taskIdGen, numTasksGen, valuesGen) { + (taskId: Int, numTasks: Int, values: JList[AnyRef]) => + val grouper = new AllGrouper(numTasks) + val partitions = grouper.getPartitions(taskId, values) + partitions.distinct.size shouldBe numTasks + partitions.min shouldBe 0 + partitions.max shouldBe (numTasks - 1) + } + } + + property("CustomGrouper should return partitions specified by user") { + val grouping = mock[CustomStreamGrouping] + val grouper = new CustomGrouper(grouping) + val topologyContext = mock[TopologyContext] + val globalStreamId = mock[GlobalStreamId] + val sourceTasks = mock[JList[Integer]] + + grouper.prepare(topologyContext, globalStreamId, sourceTasks) + + verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks) + + forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) => + 0.until(taskNum).foreach { i => + when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava) + grouper.getPartitions(taskId, values) shouldBe List(i) + } + } + } +} + +object GrouperSpec { + class Value(val i: Int) extends AnyRef { + + override def toString: String = s"$i" + + override def hashCode(): Int = i + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[Value]) { + this.i == other.asInstanceOf[Value].i + } else { + false + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala new file mode 100644 index 0000000..e0e9e61 --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.storm.util + +import java.util.{List => JList, Map => JMap} +import scala.collection.JavaConverters._ + +import backtype.storm.generated.Grouping +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump._ +import org.apache.gearpump.experiments.storm.topology.GearpumpTuple +import org.apache.gearpump.streaming.MockUtil + +class StormOutputCollectorSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val stormTaskId = 0 + val streamIdGen = Gen.alphaStr + val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) + val timestampGen = Gen.chooseNum[Long](0L, 1000L) + + property("StormOutputCollector emits tuple values into a stream") { + forAll(timestampGen, streamIdGen, valuesGen) { + (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) => + val targets = mock[JMap[String, JMap[String, Grouping]]] + val taskToComponent = mock[JMap[Integer, String]] + val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => + (Map[String, Array[Int]], JList[Integer])] + val targetPartitions = mock[Map[String, Array[Int]]] + val targetStormTaskIds = mock[JList[Integer]] + when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, + targetStormTaskIds)) + val taskContext = MockUtil.mockTaskContext + val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, + targets, getTargetPartitionsFn, taskContext, LatestTime) + + when(targets.containsKey(streamId)).thenReturn(false) + stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST + verify(taskContext, times(0)).output(anyObject[Message]) + + when(targets.containsKey(streamId)).thenReturn(true) + stormOutputCollector.setTimestamp(timestamp) + stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds + verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ + case Message(tuple: GearpumpTuple, t) => + val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) + tuple == expected && t == timestamp + })) + } + } + + property("StormOutputCollector emit direct to a task") { + val idGen = Gen.chooseNum[Int](0, 1000) + val targetGen = Gen.alphaStr + forAll(idGen, targetGen, timestampGen, streamIdGen, valuesGen) { + (id: Int, target: String, timestamp: Long, streamId: String, values: JList[AnyRef]) => + val targets = mock[JMap[String, JMap[String, Grouping]]] + val taskToComponent = mock[JMap[Integer, String]] + when(taskToComponent.get(id)).thenReturn(target) + val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => + (Map[String, Array[Int]], JList[Integer])] + val targetPartitions = mock[Map[String, Array[Int]]] + val targetStormTaskIds = mock[JList[Integer]] + when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, + targetStormTaskIds)) + val taskContext = MockUtil.mockTaskContext + val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, + targets, getTargetPartitionsFn, taskContext, LatestTime) + + when(targets.containsKey(streamId)).thenReturn(false) + verify(taskContext, times(0)).output(anyObject[Message]) + + when(targets.containsKey(streamId)).thenReturn(true) + stormOutputCollector.setTimestamp(timestamp) + stormOutputCollector.emitDirect(id, streamId, values) + val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index) + verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ + case Message(tuple: GearpumpTuple, t) => { + val expected = new GearpumpTuple(values, stormTaskId, streamId, + Map(target -> partitions)) + + val result = tuple == expected && t == timestamp + result + } + })) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala new file mode 100644 index 0000000..e787c3d --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import java.util.{HashMap => JHashMap, List => JList, Map => JMap} +import scala.collection.JavaConverters._ + +import akka.actor.ExtendedActorSystem +import backtype.storm.utils.Utils +import com.esotericsoftware.kryo.Kryo +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpTuple +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.streaming.MockUtil + +class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("StormSerializerPool should create and manage StormSerializer") { + val taskContext = MockUtil.mockTaskContext + val serializerPool = new StormSerializationFramework + val system = taskContext.system.asInstanceOf[ExtendedActorSystem] + implicit val actorSystem = system + val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]] + val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig) + serializerPool.init(system, config) + serializerPool.get shouldBe a[StormSerializer] + } + + property("StormSerializer should serialize and deserialize GearpumpTuple") { + val tupleGen = for { + values <- Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]]) + sourceTaskId <- Gen.chooseNum[Int](0, Int.MaxValue) + sourceStreamId <- Gen.alphaStr + } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null) + + val kryo = new Kryo + forAll(tupleGen) { (tuple: GearpumpTuple) => + val serializer = new StormSerializer(kryo) + serializer.deserialize(serializer.serialize(tuple)) shouldBe tuple + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala new file mode 100644 index 0000000..36d84cb --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import java.lang.{Boolean => JBoolean, Long => JLong} +import java.util.{HashMap => JHashMap, Map => JMap} +import scala.collection.JavaConverters._ + +import backtype.storm.Config +import backtype.storm.generated.StormTopology +import org.apache.storm.shade.org.json.simple.JSONValue +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} +import org.apache.gearpump.experiments.storm.util.StormConstants._ +import org.apache.gearpump.experiments.storm.util.StormUtil._ +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.task.TaskId + +class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("convert Storm task ids to gearpump TaskIds and back") { + val idGen = Gen.chooseNum[Int](0, Int.MaxValue) + forAll(idGen) { (stormTaskId: Int) => + gearpumpTaskIdToStorm(stormTaskIdToGearpump(stormTaskId)) shouldBe stormTaskId + } + + val processorIdGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16) + val indexGen = Gen.chooseNum[Int](0, Int.MaxValue >> 16) + forAll(processorIdGen, indexGen) { (processorId: Int, index: Int) => + val taskId = TaskId(processorId, index) + stormTaskIdToGearpump(gearpumpTaskIdToStorm(taskId)) shouldBe taskId + } + } + + property("get GearpumpStormComponent from user config") { + val taskContext = MockUtil.mockTaskContext + val topology = TopologyUtil.getTestTopology + implicit val actorSystem = taskContext.system + val userConfig = UserConfig.empty + .withValue[StormTopology](STORM_TOPOLOGY, topology) + .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef]) + topology.get_spouts.asScala.foreach { case (spoutId, _) => + val config = userConfig.withString(STORM_COMPONENT, spoutId) + val component = getGearpumpStormComponent(taskContext, config)(taskContext.system) + component shouldBe a[GearpumpSpout] + } + topology.get_bolts.asScala.foreach { case (boltId, _) => + val config = userConfig.withString(STORM_COMPONENT, boltId) + val component = getGearpumpStormComponent(taskContext, config)(taskContext.system) + component shouldBe a[GearpumpBolt] + } + } + + property("parse json to map") { + val mapGen = Gen.listOf[String](Gen.alphaStr) + .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]]) + + forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) => + parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map + } + + val invalidJsonGen: Gen[String] = Gen.oneOf(null, "", "1") + forAll(invalidJsonGen) { (invalidJson: String) => + val map = parseJsonStringToMap(invalidJson) + map shouldBe empty + map shouldBe a[JMap[_, _]] + } + } + + property("get int from config") { + val name = "int" + val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + getInt(conf, name) shouldBe None + conf.put(name, null) + getInt(conf, name) shouldBe None + + forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { (int: Int) => + conf.put(name, new Integer(int)) + getInt(conf, name) shouldBe Some(int) + } + + forAll(Gen.chooseNum[Long](Int.MinValue, Int.MaxValue)) { (long: Long) => + conf.put(name, new JLong(long)) + getInt(conf, name) shouldBe Some(long) + } + + forAll(Gen.alphaStr) { (s: String) => + conf.put(name, s) + an[IllegalArgumentException] should be thrownBy getInt(conf, name) + } + } + + property("get boolean from config") { + val name = "boolean" + val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + getBoolean(conf, name) shouldBe None + conf.put(name, null) + getBoolean(conf, name) shouldBe None + + forAll(Gen.oneOf(true, false)) { (boolean: Boolean) => + conf.put(name, new JBoolean(boolean)) + getBoolean(conf, name) shouldBe Some(boolean) + } + + forAll(Gen.alphaStr) { (s: String) => + conf.put(name, s) + an[IllegalArgumentException] should be thrownBy getBoolean(conf, name) + } + } + + property("mod should be correct") { + mod(10, 5) shouldBe 0 + mod(10, 6) shouldBe 4 + mod(10, -3) shouldBe -2 + mod(-2, 5) shouldBe 3 + mod(-1, -2) shouldBe -1 + } + + property("get whether ack enabled") { + val conf: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + ackEnabled(conf) shouldBe false + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(0)) + ackEnabled(conf) shouldBe false + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, null) + ackEnabled(conf) shouldBe true + forAll(Gen.chooseNum[Int](Int.MinValue, Int.MaxValue)) { + (ackers: Int) => + conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, new Integer(ackers)) + if (ackers == 0) { + ackEnabled(conf) shouldBe false + } else { + ackEnabled(conf) shouldBe true + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala new file mode 100644 index 0000000..886013c --- /dev/null +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.storm.util + +import backtype.storm.generated.StormTopology +import backtype.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout} +import backtype.storm.topology.TopologyBuilder +import backtype.storm.tuple.Fields +import backtype.storm.utils.Utils + +object TopologyUtil { + val DEFAULT_STREAM_ID = Utils.DEFAULT_STREAM_ID + val DEFAULT_COMPONENT_ID = "component" + + def getTestTopology: StormTopology = { + val topologyBuilder = new TopologyBuilder + topologyBuilder.setSpout("1", new TestWordSpout(true), 5) + topologyBuilder.setSpout("2", new TestWordSpout(true), 3) + topologyBuilder.setBolt("3", new TestWordCounter(), 3) + .fieldsGrouping("1", new Fields("word")) + .fieldsGrouping("2", new Fields("word")) + topologyBuilder.setBolt("4", new TestGlobalCount()).globalGrouping("1") + topologyBuilder.createTopology() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala deleted file mode 100644 index 6618b48..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn - -object Constants { - val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name" - val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command" - val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory" - val APPMASTER_VCORES = "gearpump.yarn.applicationmaster.vcores" - val APPMASTER_QUEUE = "gearpump.yarn.applicationmaster.queue" - - val PACKAGE_PATH = "gearpump.yarn.client.package-path" - val CONFIG_PATH = "gearpump.yarn.client.config-path" - - val MASTER_COMMAND = "gearpump.yarn.master.command" - val MASTER_MEMORY = "gearpump.yarn.master.memory" - val MASTER_VCORES = "gearpump.yarn.master.vcores" - - val WORKER_COMMAND = "gearpump.yarn.worker.command" - val WORKER_CONTAINERS = "gearpump.yarn.worker.containers" - val WORKER_MEMORY = "gearpump.yarn.worker.memory" - val WORKER_VCORES = "gearpump.yarn.worker.vcores" - - val SERVICES_ENABLED = "gearpump.yarn.services.enabled" - - val LOCAL_DIRS = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.LOCAL_DIRS.$$() - val CONTAINER_ID = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.$$() - val LOG_DIR_EXPANSION_VAR = org.apache.hadoop.yarn.api.ApplicationConstants.LOG_DIR_EXPANSION_VAR - val NODEMANAGER_HOST = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.$$() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala deleted file mode 100644 index af871ab..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.appmaster - -import com.typesafe.config.Config - -import io.gearpump.cluster.main.{Master, Worker} -import io.gearpump.experiments.yarn.Constants._ -import io.gearpump.transport.HostPort -import io.gearpump.util.Constants - -/** Command to start a YARN container */ -trait Command { - def get: String - override def toString: String = get -} - -abstract class AbstractCommand extends Command { - protected def config: Config - def version: String - def classPath: Array[String] = { - Array( - s"conf", - s"pack/$version/conf", - s"pack/$version/lib/daemon/*", - s"pack/$version/lib/*" - ) - } - - protected def buildCommand( - java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String]) - : String = { - val exe = config.getString(java) - - s"$exe -cp ${classPath.mkString(":")}:" + - "$CLASSPATH " + properties.mkString(" ") + - s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr" - } - - protected def clazz(any: AnyRef): String = { - val name = any.getClass.getName - if (name.endsWith("$")) { - name.dropRight(1) - } else { - name - } - } -} - -case class MasterCommand(config: Config, version: String, masterAddr: HostPort) - extends AbstractCommand { - - def get: String = { - val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}") - - val properties = Array( - s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", - s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}", - s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}", - s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", - s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", - s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}") - - buildCommand(MASTER_COMMAND, properties, clazz(Master), masterArguments) - } -} - -case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String) - extends AbstractCommand { - - def get: String = { - val properties = Array( - s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", - s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", - s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}", - s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", - s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}", - s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost") - - buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String]) - } -} - -case class AppMasterCommand(config: Config, version: String, args: Array[String]) - extends AbstractCommand { - - override val classPath = Array( - "conf", - s"pack/$version/conf", - s"pack/$version/dashboard", - s"pack/$version/lib/*", - s"pack/$version/lib/daemon/*", - s"pack/$version/lib/services/*", - s"pack/$version/lib/yarn/*" - ) - - def get: String = { - val properties = Array( - s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", - s"-D${Constants.GEARPUMP_FULL_SCALA_VERSION}=$version", - s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", - s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}", - s"-D${Constants.GEARPUMP_HOSTNAME}=${NODEMANAGER_HOST}") - - val arguments = Array(s"") ++ args - - buildCommand(APPMASTER_COMMAND, properties, clazz(YarnAppMaster), - arguments) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala deleted file mode 100644 index 6dd5011..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.appmaster - -import scala.concurrent.Future - -import akka.actor._ -import com.typesafe.config.{ConfigFactory, ConfigValueFactory} - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.services.main.Services -import io.gearpump.transport.HostPort -import io.gearpump.util.{ActorUtil, Constants, LogUtil} - -trait UIFactory { - def props(masters: List[HostPort], host: String, port: Int): Props -} - -/** Wrapper of UI server */ -class UIService(masters: List[HostPort], host: String, port: Int) extends Actor { - private val LOG = LogUtil.getLogger(getClass) - - private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path) - private var configFile: java.io.File = null - - private implicit val dispatcher = context.dispatcher - - override def postStop(): Unit = { - if (configFile != null) { - configFile.delete() - configFile = null - } - - // TODO: fix this - // Hack around to Kill the UI server - Services.kill() - } - - override def preStart(): Unit = { - Future(start()) - } - - def start(): Unit = { - val mastersArg = masters.mkString(",") - LOG.info(s"Launching services -master $mastersArg") - - configFile = java.io.File.createTempFile("uiserver", ".conf") - - val config = context.system.settings.config. - withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)). - withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)). - withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host)) - - ClusterConfig.saveConfig(config, configFile) - - val master = masters.head - - ConfigFactory.invalidateCaches() - launch(supervisor, master.host, master.port, configFile.toString) - } - - // Launch the UI server - def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { - Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort" - , "-conf", configFile)) - } - - override def receive: Receive = { - case _ => - LOG.error(s"Unknown message received") - } -} - -object UIService extends UIFactory { - override def props(masters: List[HostPort], host: String, port: Int): Props = { - Props(new UIService(masters, host, port)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala deleted file mode 100644 index 1df7fb9..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.appmaster - -import java.io.IOException -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor._ -import akka.util.Timeout -import com.typesafe.config.ConfigValueFactory -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.httpclient.methods.GetMethod -import org.slf4j.Logger - -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.experiments.yarn.Constants._ -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig} -import io.gearpump.transport.HostPort -import io.gearpump.util._ - -/** - * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as - * YARN containers. - * - * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker. - */ -class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, - packagePath: String, hdfsConfDir: String, - uiFactory: UIFactory) - extends Actor { - - private val LOG: Logger = LogUtil.getLogger(getClass) - private val akkaConf = context.system.settings.config - private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean - private var uiStarted = false - private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) - - private val port = Util.findFreePort().get - - private val trackingURL = "http://" + host + ":" + port - - // TODO: for now, only one master is supported. - private val masterCount = 1 - private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt - private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt - - private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt - private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt - private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt - - val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION) - - rmClient.start(self) - nmClient.start(self) - - def receive: Receive = null - - private def registerAppMaster(): Unit = { - val target = host + ":" + port - rmClient.registerAppMaster(host, port, trackingURL) - } - - registerAppMaster - context.become(waitForAppMasterRegistered) - - import io.gearpump.experiments.yarn.appmaster.YarnAppMaster._ - - def waitForAppMasterRegistered: Receive = { - case AppMasterRegistered => - LOG.info("YarnAppMaster registration completed") - requestMasterContainers(masterCount) - context.become(startingMasters(remain = masterCount, List.empty[MasterInfo])) - } - - private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box { - case ContainersAllocated(containers) => - LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: " - + containers.size) - val count = Math.min(containers.length, remain) - val newMasters = (0 until count).toList.map { index => - val container = containers(index) - MasterInfo(container.getId, container.getNodeId, launchMaster(container)) - } - - // Stops un-used containers - containers.drop(count).map { container => - nmClient.stopContainer(container.getId, container.getNodeId) - } - - context.become(startingMasters(remain, newMasters ++ masters)) - case ContainerStarted(containerId) => - LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ") - if (remain > 1) { - context.become(startingMasters(remain - 1, masters)) - } else { - requestWorkerContainers(workerCount) - context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo])) - } - } - - private def box(receive: Receive): Receive = { - onError orElse receive orElse unHandled - } - - private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo]) - : Receive = { - box { - case ContainersAllocated(containers) => - LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " + - s"count: " + containers.size) - - val count = Math.min(containers.length, remain) - val newWorkers = (0 until count).toList.map { index => - val container = containers(index) - launchWorker(container, masters) - WorkerInfo(container.getId, container.getNodeId) - } - - // Stops un-used containers - containers.drop(count).map { container => - nmClient.stopContainer(container.getId, container.getNodeId) - } - context.become(startingWorkers(remain, masters, workers ++ newWorkers)) - case ContainerStarted(containerId) => - LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)") - // The last one - if (remain > 1) { - context.become(startingWorkers(remain - 1, masters, workers)) - } else { - if (servicesEnabled && !uiStarted) { - context.actorOf(uiFactory.props(masters.map(_.host), host, port)) - uiStarted = true - } - context.become(service(effectiveConfig(masters.map(_.host)), masters, workers)) - } - } - } - - private def effectiveConfig(masters: List[HostPort]): Config = { - val masterList = masters.map(pair => s"${pair.host}:${pair.port}") - val config = context.system.settings.config - config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, - ConfigValueFactory.fromIterable(masterList.asJava)) - } - - private def onError: Receive = { - case ContainersCompleted(containers) => - // TODO: we should recover the failed container from this... - containers.foreach { status => - if (status.getExitStatus != 0) { - LOG.error(s"ContainersCompleted: container ${status.getContainerId}" + - s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}") - } else { - LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed") - } - } - case ShutdownApplication => - LOG.error("ShutdownApplication") - nmClient.stop() - rmClient.shutdownApplication() - context.stop(self) - case ResourceManagerException(ex) => - LOG.error("ResourceManagerException: " + ex.getMessage, ex) - nmClient.stop() - rmClient.failApplication(ex) - context.stop(self) - case Kill => - LOG.info("Kill: User asked to shutdown the application") - sender ! CommandResult(success = true) - self ! ShutdownApplication - } - - private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo]) - : Receive = box { - case GetActiveConfig(clientHost) => - LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost) - val filtered = ClusterConfig.filterOutDefaultConfig( - config.withValue(Constants.GEARPUMP_HOSTNAME, - ConfigValueFactory.fromAnyRef(clientHost))) - sender ! ActiveConfig(filtered) - case QueryVersion => - LOG.info("QueryVersion") - sender ! Version(Util.version) - case QueryClusterInfo => - LOG.info("QueryClusterInfo") - val masterContainers = masters.map { master => - master.id.toString + s"(${master.nodeId.toString})" - } - - val workerContainers = workers.map { worker => - worker.id.toString + s"(${worker.nodeId.toString})" - } - sender ! ClusterInfo(masterContainers, workerContainers) - case AddMaster => - sender ! CommandResult(success = false, "Not Implemented") - case RemoveMaster(masterId) => - sender ! CommandResult(success = false, "Not Implemented") - case AddWorker(count) => - if (count == 0) { - sender ! CommandResult(success = true) - } else { - LOG.info("AddWorker: Start to add new workers, count: " + count) - workerCount += count - requestWorkerContainers(count) - context.become(startingWorkers(count, masters, workers)) - sender ! CommandResult(success = true) - } - case RemoveWorker(worker) => - val workerId = ContainerId.fromString(worker) - LOG.info(s"RemoveWorker: remove worker $workerId") - val info = workers.find(_.id.toString == workerId.toString) - if (info.isDefined) { - nmClient.stopContainer(info.get.id, info.get.nodeId) - sender ! CommandResult(success = true) - val remainWorkers = workers.filter(_.id != info.get.id) - context.become(service(config, masters, remainWorkers)) - } else { - sender ! CommandResult(success = false, "failed to find worker " + worker) - } - } - - private def unHandled: Receive = { - case other => - LOG.info(s"Received unknown message $other") - } - - private def requestMasterContainers(masters: Int) = { - LOG.info(s"Request resource for masters($masters)") - val containers = (1 to masters).map( - i => Resource.newInstance(masterMemory, masterVCores) - ).toList - rmClient.requestContainers(containers) - } - - private def launchMaster(container: Container): HostPort = { - LOG.info(s"Launch Master on container " + container.getNodeHttpAddress) - val host = container.getNodeId.getHost - - val port = Util.findFreePort().get - - LOG.info("=============PORT" + port) - val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port)) - nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir) - HostPort(host, port) - } - - private def requestWorkerContainers(workers: Int): Unit = { - LOG.info(s"Request resource for workers($workers)") - val containers = (1 to workers).map( - i => Resource.newInstance(workerMemory, workerVCores) - ).toList - - rmClient.requestContainers(containers) - } - - private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = { - LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress) - val workerHost = container.getNodeId.getHost - val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost) - nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir) - } -} - -object YarnAppMaster extends AkkaApp with ArgumentsParser { - val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true), - "package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true) - ) - - override def akkaConfig: Config = { - ClusterConfig.ui() - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - implicit val timeout = Timeout(5, TimeUnit.SECONDS) - implicit val system = ActorSystem("GearpumpAM", akkaConf) - - val yarnConf = new YarnConfig() - - val confDir = parse(args).getString("conf") - val packagePath = parse(args).getString("package") - - LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR")) - LOG.info("YARN Resource Manager: " + yarnConf.resourceManager) - - val rmClient = new RMClient(yarnConf) - val nmClient = new NMClient(yarnConf, akkaConf) - val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient, - nmClient, packagePath, confDir, UIService))) - - val daemon = system.actorOf(Props(new Daemon(appMaster))) - Await.result(system.whenTerminated, Duration.Inf) - LOG.info("YarnAppMaster is shutdown") - } - - class Daemon(appMaster: ActorRef) extends Actor { - context.watch(appMaster) - - override def receive: Actor.Receive = { - case Terminated(actor) => - if (actor.compareTo(appMaster) == 0) { - LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " + - s"shutting down current ActorSystem") - context.system.terminate() - context.stop(self) - } - } - } - - case class ResourceManagerException(throwable: Throwable) - case object ShutdownApplication - case class ContainersRequest(containers: List[Resource]) - case class ContainersAllocated(containers: List[Container]) - case class ContainersCompleted(containers: List[ContainerStatus]) - case class ContainerStarted(containerId: ContainerId) - case object AppMasterRegistered - - case class GetActiveConfig(clientHost: String) - - case object QueryClusterInfo - case class ClusterInfo(masters: List[String], workers: List[String]) { - override def toString: String = { - val separator = "\n" - val masterSection = "masters: " + separator + masters.mkString("\n") + "\n" - - val workerSection = "workers: " + separator + workers.mkString("\n") + "\n" - masterSection + workerSection - } - } - - case object Kill - case class ActiveConfig(config: Config) - - case object QueryVersion - - case class Version(version: String) - - case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort) - - case class WorkerInfo(id: ContainerId, nodeId: NodeId) - - def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = { - val client = new HttpClient() - val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path" - val get = new GetMethod(appMasterPath) - var status = client.executeMethod(get) - - if (status != 200) { - // Sleeps a little bit, and try again - Thread.sleep(3000) - status = client.executeMethod(get) - } - - if (status == 200) { - AkkaHelper.actorFor(system, get.getResponseBodyAsString) - } else { - throw new IOException("Fail to resolve AppMaster address, please make sure " + - s"${report.getOriginalTrackingUrl} is accessible...") - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala deleted file mode 100644 index 49ec3a0..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.client - -import java.io.IOException -import scala.util.Try - -import akka.actor.{ActorRef, ActorSystem} -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.httpclient.methods.GetMethod - -import io.gearpump.experiments.yarn.glue.Records.ApplicationId -import io.gearpump.experiments.yarn.glue.YarnClient -import io.gearpump.util.{AkkaHelper, LogUtil} - -/** - * Resolves AppMaster ActorRef - */ -class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { - val LOG = LogUtil.getLogger(getClass) - val RETRY_INTERVAL_MS = 3000 // ms - - def resolve(appId: ApplicationId, timeoutSeconds: Int = 30): ActorRef = { - val appMaster = retry(connect(appId), 1 + timeoutSeconds * 1000 / RETRY_INTERVAL_MS) - appMaster - } - - private def connect(appId: ApplicationId): ActorRef = { - val report = yarnClient.getApplicationReport(appId) - val client = new HttpClient() - val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path" - LOG.info(s"appMasterPath=$appMasterPath") - val get = new GetMethod(appMasterPath) - val status = client.executeMethod(get) - if (status == 200) { - val response = get.getResponseBodyAsString - LOG.info("Successfully resolved AppMaster address: " + response) - AkkaHelper.actorFor(system, response) - } else { - throw new IOException("Fail to resolve AppMaster address, please make sure " + - s"${report.getOriginalTrackingUrl} is accessible...") - } - } - - private def retry(fun: => ActorRef, times: Int): ActorRef = { - var index = 0 - var result: ActorRef = null - while (index < times && result == null) { - Thread.sleep(RETRY_INTERVAL_MS) - index += 1 - val tryConnect = Try(fun) - if (tryConnect.isFailure) { - LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " + - tryConnect.failed.get.getMessage) - } else { - result = tryConnect.get - } - } - result - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala deleted file mode 100644 index 7c3bc38..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.yarn.client -import org.slf4j.Logger - -import io.gearpump.util.LogUtil - -/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */ -object Client { - - private val LOG: Logger = LogUtil.getLogger(getClass) - val LAUNCH = "launch" - - val commands = Map(LAUNCH -> LaunchCluster) ++ - ManageCluster.commands.map(key => (key, ManageCluster)).toMap - - def usage(): Unit = { - val keys = commands.keys.toList.sorted - // scalastyle:off println - Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") - // scalastyle:on println - } - - def main(args: Array[String]): Unit = { - if (args.length == 0) { - usage() - } else { - val key = args(0) - val command = commands.get(key) - command match { - case Some(command) => - if (key == LAUNCH) { - val remainArgs = args.drop(1) - command.main(remainArgs) - } else { - val commandArg = Array("-" + ManageCluster.COMMAND, key) - val remainArgs = args.drop(1) - val updatedArgs = commandArg ++ args.drop(1) - command.main(updatedArgs) - } - case None => - usage - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala deleted file mode 100644 index ea5e707..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.experiments.yarn.client - -import java.io.{File, IOException, OutputStreamWriter} -import java.net.InetAddress -import java.util.zip.ZipInputStream -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -import akka.actor.ActorSystem -import com.typesafe.config.{Config, ConfigValueFactory} -import org.slf4j.Logger - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.experiments.yarn -import io.gearpump.experiments.yarn.Constants -import io.gearpump.experiments.yarn.appmaster.AppMasterCommand -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig} -import io.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource} -import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig} -import io.gearpump.util.ActorUtil.askActor -import io.gearpump.util.{AkkaApp, LogUtil, Util} - -/** - * Launch Gearpump on YARN - */ -class LaunchCluster( - akka: Config, - yarnConf: YarnConfig, - yarnClient: YarnClient, - fs: FileSystem, - actorSystem: ActorSystem, - appMasterResolver: AppMasterResolver, - version: String = Util.version) { - - import io.gearpump.experiments.yarn.Constants._ - private implicit val dispatcher = actorSystem.dispatcher - - private val LOG: Logger = LogUtil.getLogger(getClass) - private val host = InetAddress.getLocalHost.getHostName - private val queue = akka.getString(APPMASTER_QUEUE) - private val memory = akka.getString(APPMASTER_MEMORY).toInt - private val vcore = akka.getString(APPMASTER_VCORES).toInt - - def submit(appName: String, packagePath: String): ApplicationId = { - LOG.info("Starting AM") - - // First step, check the version, to make sure local version matches remote version - if (!packagePath.endsWith(".zip")) { - throw new IOException(s"YarnClient only support .zip distribution package," + - s" now it is ${packagePath}. Please download the zip " + - "package from website or use sbt assembly packArchiveZip to build one.") - } - - if (!fs.exists(packagePath)) { - throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ") - } - - val rootEntry = rootEntryPath(zip = packagePath) - - if (!rootEntry.contains(version)) { - throw new IOException(s"Check version failed! Local gearpump binary" + - s" version $version doesn't match with remote path $packagePath") - } - - val resource = Resource.newInstance(memory, vcore) - val appId = yarnClient.createApplication - - // uploads the configs to HDFS home directory of current user. - val configPath = uploadConfigToHDFS(appId) - - val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath", - s"-package $packagePath")) - - yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath) - - LOG.info("Waiting application to finish...") - val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS) - LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " + - s"at ${report.getFinishTime}, info: ${report.getDiagnostics}") - - // scalastyle:off println - Console.println("================================================") - Console.println("==Application Id: " + appId) - // scalastyle:on println - appId - } - - def saveConfig(appId: ApplicationId, output: String): Future[File] = { - LOG.info(s"Trying to download active configuration to output path: " + output) - LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId) - val appMaster = appMasterResolver.resolve(appId) - LOG.info(s"appMaster=${appMaster.path} host=$host") - val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config) - future.map { config => - val out = new File(output) - ClusterConfig.saveConfig(config, out) - out - } - } - - private def uploadConfigToHDFS(appId: ApplicationId): String = { - // Uses personal home directory so that it will not conflict with other users - // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf - val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/" - LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...") - - // Copies config from local to remote. - val remoteConfFile = s"$confDir/gear.conf" - var out = fs.create(remoteConfFile) - var writer = new OutputStreamWriter(out) - - val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka) - - writer.write(cleanedConfig.root().render()) - writer.close() - - // Saves yarn-site.xml to remote - val yarn_site_xml = s"$confDir/yarn-site.xml" - out = fs.create(yarn_site_xml) - writer = new OutputStreamWriter(out) - yarnConf.writeXml(writer) - writer.close() - - // Saves log4j.properties to remote - val log4j_properties = s"$confDir/log4j.properties" - val log4j = LogUtil.loadConfiguration - out = fs.create(log4j_properties) - writer = new OutputStreamWriter(out) - log4j.store(writer, "gearpump on yarn") - writer.close() - confDir.toString - } - - private def rootEntryPath(zip: String): String = { - val stream = new ZipInputStream(fs.open(zip)) - val entry = stream.getNextEntry() - val name = entry.getName - name.substring(0, entry.getName.indexOf("/")) - } -} - -object LaunchCluster extends AkkaApp with ArgumentsParser { - - val PACKAGE = "package" - val NAME = "name" - val VERBOSE = "verbose" - val OUTPUT = "output" - - override protected def akkaConfig: Config = { - ClusterConfig.default() - } - - override val options: Array[(String, CLIOption[Any])] = Array( - PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " + - "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false), - NAME -> CLIOption[String]("<Application name showed in YARN>", required = false, - defaultValue = Some("Gearpump")), - VERBOSE -> CLIOption("<print verbose log on console>", required = false, - defaultValue = Some(false)), - OUTPUT -> CLIOption("<output path for configuration file>", required = false, - defaultValue = None) - ) - private val TIMEOUT_MILLISECONDS = 30 * 1000 - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - val parsed = parse(args) - if (parsed.getBoolean(VERBOSE)) { - LogUtil.verboseLogToConsole() - } - - val yarnConfig = new YarnConfig() - val fs = new FileSystem(yarnConfig) - val yarnClient = new YarnClient(yarnConfig) - val akkaConf = updateConf(inputAkkaConf, parsed) - val actorSystem = ActorSystem("launchCluster", akkaConf) - val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem) - - val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs, - actorSystem, appMasterResolver) - - val name = parsed.getString(NAME) - val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH)) - - if (parsed.exists(OUTPUT)) { - import scala.concurrent.duration._ - Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)), - TIMEOUT_MILLISECONDS.milliseconds) - } - - yarnClient.stop() - actorSystem.terminate() - Await.result(actorSystem.whenTerminated, Duration.Inf) - } - - private def updateConf(akka: Config, parsed: ParseResult): Config = { - if (parsed.exists(PACKAGE)) { - akka.withValue(Constants.PACKAGE_PATH, - ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE))) - } else { - akka - } - } -} \ No newline at end of file
