http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala index eb2bc6e..319ee27 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,25 +17,28 @@ */ package io.gearpump.experiments.storm.topology -import akka.actor.ActorRef -import backtype.storm.spout.{SpoutOutputCollector, ISpout} -import backtype.storm.task.{OutputCollector, IBolt, GeneralTopologyContext, TopologyContext} import java.util.{Map => JMap} + +import akka.actor.ActorRef +import backtype.storm.spout.{ISpout, SpoutOutputCollector} +import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} import backtype.storm.tuple.Tuple -import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector -import io.gearpump.{TimeStamp, Message} -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} -import io.gearpump.experiments.storm.util.StormOutputCollector -import io.gearpump.streaming.{MockUtil, DAG} -import io.gearpump.streaming.task.{TaskContext, StartTime, TaskId} +import org.mockito.Matchers.{anyObject, eq => mockitoEq} import org.mockito.Mockito._ -import org.mockito.Matchers.{anyLong, anyObject, eq => mockitoEq} import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks -import org.scalatest.{PropSpec, Matchers} +import org.scalatest.{Matchers, PropSpec} -class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { +import io.gearpump.experiments.storm.producer.StormSpoutOutputCollector +import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} +import io.gearpump.experiments.storm.util.StormOutputCollector +import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId} +import io.gearpump.streaming.{DAG, MockUtil} +import io.gearpump.{Message, TimeStamp} + +class GearpumpStormComponentSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { property("GearpumpSpout lifecycle") { val config = mock[JMap[AnyRef, AnyRef]] @@ -56,13 +59,14 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match val gearpumpSpout = GearpumpSpout(config, spout, getDAG, getTopologyContext, getOutputCollector, ackEnabled = false, taskContext) - // start + // Start val startTime = mock[StartTime] gearpumpSpout.start(startTime) - verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), anyObject[SpoutOutputCollector]) + verify(spout).open(mockitoEq(config), mockitoEq(topologyContext), + anyObject[SpoutOutputCollector]) - // next + // Next val message = mock[Message] gearpumpSpout.next(message) @@ -93,16 +97,17 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match val getTickTuple = mock[(GeneralTopologyContext, Int) => Tuple] val tickTuple = mock[Tuple] when(getTickTuple(mockitoEq(generalTopologyContext), anyObject[Int]())).thenReturn(tickTuple) - val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext, getGeneralTopologyContext, - getOutputCollector, getTickTuple, taskContext) + val gearpumpBolt = GearpumpBolt(config, bolt, getDAG, getTopologyContext, + getGeneralTopologyContext, getOutputCollector, getTickTuple, taskContext) - // start + // Start val startTime = mock[StartTime] gearpumpBolt.start(startTime) - verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), anyObject[OutputCollector]) + verify(bolt).prepare(mockitoEq(config), mockitoEq(topologyContext), + anyObject[OutputCollector]) - // next + // Next val gearpumpTuple = mock[GearpumpTuple] val tuple = mock[Tuple] when(gearpumpTuple.toTuple(generalTopologyContext, timestamp)).thenReturn(tuple) @@ -112,11 +117,9 @@ class GearpumpStormComponentSpec extends PropSpec with PropertyChecks with Match verify(stormOutputCollector).setTimestamp(timestamp) verify(bolt).execute(tuple) - - // tick + // Tick gearpumpBolt.tick(freq) verify(bolt).execute(tickTuple) } } - }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala index 8f10886..f9ffc5f 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,16 +19,16 @@ package io.gearpump.experiments.storm.topology import java.util.{HashMap => JHashMap, Map => JMap} +import scala.collection.JavaConverters._ import backtype.storm.Config +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + import io.gearpump.experiments.storm.processor.StormProcessor import io.gearpump.experiments.storm.producer.StormProducer import io.gearpump.experiments.storm.util.TopologyUtil import io.gearpump.streaming.MockUtil -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} - -import scala.collection.JavaConversions._ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar { import io.gearpump.experiments.storm.topology.GearpumpStormTopologySpec._ @@ -43,18 +43,19 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar val appConfig = newJavaConfig(name, appVal) implicit val system = MockUtil.system - val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig) - topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1" + val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, + newEmptyConfig) + topology1.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology1" topology1.getStormConfig should not contain name - val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig) - topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2" + val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, + newEmptyConfig) + topology2.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology2" topology2.getStormConfig.get(name) shouldBe sysVal val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig) - topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3" + topology3.getStormConfig.get(Config.TOPOLOGY_NAME) shouldBe "topology3" topology3.getStormConfig.get(name) shouldBe appVal - } "create Gearpump processors from Storm topology" in { @@ -63,12 +64,12 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar val gearpumpStormTopology = GearpumpStormTopology("app", stormTopology, null) val processors = gearpumpStormTopology.getProcessors - stormTopology.get_spouts().foreach { case (spoutId, _) => + stormTopology.get_spouts().asScala.foreach { case (spoutId, _) => val processor = processors(spoutId) processor.taskClass shouldBe classOf[StormProducer] processor.description shouldBe spoutId } - stormTopology.get_bolts().foreach { case (boltId, _) => + stormTopology.get_bolts().asScala.foreach { case (boltId, _) => val processor = processors(boltId) processor.taskClass shouldBe classOf[StormProcessor] processor.description shouldBe boltId @@ -88,7 +89,6 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar targets1 should contain key "3" } } - } object GearpumpStormTopologySpec { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala index 4ccdd39..f454145 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,17 @@ package io.gearpump.experiments.storm.topology import java.util.{List => JList} +import scala.collection.JavaConverters._ import backtype.storm.task.GeneralTopologyContext import backtype.storm.tuple.Fields -import io.gearpump.TimeStamp import org.mockito.Mockito._ import org.scalacheck.Gen -import org.scalatest.{Matchers, PropSpec} import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ +import org.scalatest.{Matchers, PropSpec} + +import io.gearpump.TimeStamp class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -42,14 +42,14 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) { (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) => val topologyContext = mock[GeneralTopologyContext] - val fields = new Fields(gearpumpTuple.values.map(_.asInstanceOf[String]): _*) + val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*) when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId) when(topologyContext.getComponentOutputFields( componentId, gearpumpTuple.sourceStreamId)).thenReturn(fields) val tuple = gearpumpTuple.toTuple(topologyContext, timestamp) - tuple shouldBe a [TimedTuple] + tuple shouldBe a[TimedTuple] val timedTuple = tuple.asInstanceOf[TimedTuple] timedTuple.getValues shouldBe gearpumpTuple.values timedTuple.getSourceTask shouldBe gearpumpTuple.sourceTaskId @@ -60,5 +60,4 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with timedTuple.timestamp shouldBe timestamp } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala index 4efe06c..27c02fe 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GraphBuilderSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,13 +18,14 @@ package io.gearpump.experiments.storm.util +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + import io.gearpump.experiments.storm.partitioner.StormPartitioner import io.gearpump.experiments.storm.topology.GearpumpStormTopology import io.gearpump.streaming.Processor import io.gearpump.streaming.task.Task -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar { @@ -47,7 +48,7 @@ class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar { graph.edges.size shouldBe 1 val (from, edge, to) = graph.edges.head from shouldBe sourceProcessor - edge shouldBe a [StormPartitioner] + edge shouldBe a[StormPartitioner] to shouldBe targetProcessor } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala index 7566f2f..599ea8d 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/GrouperSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,18 +19,19 @@ package io.gearpump.experiments.storm.util import java.util.{List => JList} +import scala.collection.JavaConverters._ + import backtype.storm.generated.GlobalStreamId import backtype.storm.grouping.CustomStreamGrouping import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields -import io.gearpump.experiments.storm.util.GrouperSpec.Value import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConverters._ +import io.gearpump.experiments.storm.util.GrouperSpec.Value class GrouperSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -77,7 +78,7 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit val grouper = new FieldsGrouper(outFields, groupFields, numTasks) grouper.getPartitions(taskId, values.toList.asJava.asInstanceOf[JList[AnyRef]]) - }.distinct.size shouldBe numTasks + }.distinct.size shouldBe numTasks } } @@ -103,8 +104,8 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit verify(grouping).prepare(topologyContext, globalStreamId, sourceTasks) - forAll(taskIdGen, valuesGen, numTasksGen) { (taskId: Int, values: JList[AnyRef], numTasks: Int) => - 0.until(numTasks).foreach { i => + forAll(taskIdGen, valuesGen, numTasksGen) {(taskId: Int, values: JList[AnyRef], taskNum: Int) => + 0.until(taskNum).foreach { i => when(grouping.chooseTasks(taskId, values)).thenReturn(List(new Integer(i)).asJava) grouper.getPartitions(taskId, values) shouldBe List(i) } @@ -114,7 +115,17 @@ class GrouperSpec extends PropSpec with PropertyChecks with Matchers with Mockit object GrouperSpec { class Value(val i: Int) extends AnyRef { + override def toString: String = s"$i" + override def hashCode(): Int = i + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[Value]) { + this.i == other.asInstanceOf[Value].i + } else { + false + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index c75e92c..1a28694 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,21 +17,23 @@ */ package io.gearpump.experiments.storm.util -import java.util.{Map => JMap, List => JList} +import java.util.{List => JList, Map => JMap} +import scala.collection.JavaConverters._ + import backtype.storm.generated.Grouping -import io.gearpump._ -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.streaming.MockUtil import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, PropSpec} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConverters._ +import io.gearpump._ +import io.gearpump.experiments.storm.topology.GearpumpTuple +import io.gearpump.streaming.MockUtil -class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { +class StormOutputCollectorSpec + extends PropSpec with PropertyChecks with Matchers with MockitoSugar { val stormTaskId = 0 val streamIdGen = Gen.alphaStr @@ -43,13 +45,15 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) => val targets = mock[JMap[String, JMap[String, Grouping]]] val taskToComponent = mock[JMap[Integer, String]] - val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])] + val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => + (Map[String, Array[Int]], JList[Integer])] val targetPartitions = mock[Map[String, Array[Int]]] val targetStormTaskIds = mock[JList[Integer]] - when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds)) + when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, + targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext - val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, - taskContext, LatestTime) + val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, + targets, getTargetPartitionsFn, taskContext, LatestTime) when(targets.containsKey(streamId)).thenReturn(false) stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST @@ -59,12 +63,11 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher stormOutputCollector.setTimestamp(timestamp) stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ - case Message(tuple: GearpumpTuple, t) => - val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - tuple == expected && t == timestamp + case Message(tuple: GearpumpTuple, t) => + val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) + tuple == expected && t == timestamp })) } - } property("StormOutputCollector emit direct to a task") { @@ -75,13 +78,15 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher val targets = mock[JMap[String, JMap[String, Grouping]]] val taskToComponent = mock[JMap[Integer, String]] when(taskToComponent.get(id)).thenReturn(target) - val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer])] + val getTargetPartitionsFn = mock[(String, JList[AnyRef]) => + (Map[String, Array[Int]], JList[Integer])] val targetPartitions = mock[Map[String, Array[Int]]] val targetStormTaskIds = mock[JList[Integer]] - when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, targetStormTaskIds)) + when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions, + targetStormTaskIds)) val taskContext = MockUtil.mockTaskContext - val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn, - taskContext, LatestTime) + val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent, + targets, getTargetPartitionsFn, taskContext, LatestTime) when(targets.containsKey(streamId)).thenReturn(false) verify(taskContext, times(0)).output(anyObject[Message]) @@ -91,11 +96,14 @@ class StormOutputCollectorSpec extends PropSpec with PropertyChecks with Matcher stormOutputCollector.emitDirect(id, streamId, values) val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index) verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ - case Message(tuple: GearpumpTuple, t) => - val expected = new GearpumpTuple(values, stormTaskId, streamId, Map(target -> partitions)) - tuple == expected && t == timestamp + case Message(tuple: GearpumpTuple, t) => { + val expected = new GearpumpTuple(values, stormTaskId, streamId, + Map(target -> partitions)) + + val result = tuple == expected && t == timestamp + result + } })) } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala index f42b920..d8a29e8 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,20 +19,20 @@ package io.gearpump.experiments.storm.util import java.util.{HashMap => JHashMap, List => JList, Map => JMap} +import scala.collection.JavaConverters._ import akka.actor.ExtendedActorSystem import backtype.storm.utils.Utils import com.esotericsoftware.kryo.Kryo -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpTuple -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.streaming.MockUtil import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConverters._ +import io.gearpump.cluster.UserConfig +import io.gearpump.experiments.storm.topology.GearpumpTuple +import io.gearpump.experiments.storm.util.StormConstants._ +import io.gearpump.streaming.MockUtil class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -44,7 +44,7 @@ class StormSerializerPoolSpec extends PropSpec with PropertyChecks with Matchers val stormConfig = Utils.readDefaultConfig.asInstanceOf[JMap[AnyRef, AnyRef]] val config = UserConfig.empty.withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, stormConfig) serializerPool.init(system, config) - serializerPool.get shouldBe a [StormSerializer] + serializerPool.get shouldBe a[StormSerializer] } property("StormSerializer should serialize and deserialize GearpumpTuple") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala index 610b701..a9b93c9 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/StormUtilSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,27 +20,25 @@ package io.gearpump.experiments.storm.util import java.lang.{Boolean => JBoolean, Long => JLong} import java.util.{HashMap => JHashMap, Map => JMap} +import scala.collection.JavaConverters._ import backtype.storm.Config import backtype.storm.generated.StormTopology -import io.gearpump.cluster.UserConfig -import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} -import io.gearpump.experiments.storm.util.StormConstants._ -import io.gearpump.experiments.storm.util.StormUtil._ -import io.gearpump.streaming.MockUtil -import io.gearpump.streaming.task.TaskId import org.apache.storm.shade.org.json.simple.JSONValue import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ +import io.gearpump.cluster.UserConfig +import io.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} +import io.gearpump.experiments.storm.util.StormConstants._ +import io.gearpump.experiments.storm.util.StormUtil._ +import io.gearpump.streaming.MockUtil +import io.gearpump.streaming.task.TaskId class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - property("convert Storm task ids to gearpump TaskIds and back") { val idGen = Gen.chooseNum[Int](0, Int.MaxValue) forAll(idGen) { (stormTaskId: Int) => @@ -60,23 +58,23 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock val topology = TopologyUtil.getTestTopology implicit val actorSystem = taskContext.system val userConfig = UserConfig.empty - .withValue[StormTopology](STORM_TOPOLOGY, topology) - .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef]) - topology.get_spouts foreach { case (spoutId, _) => + .withValue[StormTopology](STORM_TOPOLOGY, topology) + .withValue[JMap[AnyRef, AnyRef]](STORM_CONFIG, new JHashMap[AnyRef, AnyRef]) + topology.get_spouts.asScala.foreach { case (spoutId, _) => val config = userConfig.withString(STORM_COMPONENT, spoutId) val component = getGearpumpStormComponent(taskContext, config)(taskContext.system) - component shouldBe a [GearpumpSpout] + component shouldBe a[GearpumpSpout] } - topology.get_bolts foreach { case (boltId, _) => + topology.get_bolts.asScala.foreach { case (boltId, _) => val config = userConfig.withString(STORM_COMPONENT, boltId) val component = getGearpumpStormComponent(taskContext, config)(taskContext.system) - component shouldBe a [GearpumpBolt] + component shouldBe a[GearpumpBolt] } } property("parse json to map") { val mapGen = Gen.listOf[String](Gen.alphaStr) - .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]]) + .map(_.map(s => (s, s)).toMap.asJava.asInstanceOf[JMap[AnyRef, AnyRef]]) forAll(mapGen) { (map: JMap[AnyRef, AnyRef]) => parseJsonStringToMap(JSONValue.toJSONString(map)) shouldBe map @@ -86,7 +84,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock forAll(invalidJsonGen) { (invalidJson: String) => val map = parseJsonStringToMap(invalidJson) map shouldBe empty - map shouldBe a [JMap[_, _]] + map shouldBe a[JMap[_, _]] } } @@ -109,7 +107,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock forAll(Gen.alphaStr) { (s: String) => conf.put(name, s) - an [IllegalArgumentException] should be thrownBy getInt(conf, name) + an[IllegalArgumentException] should be thrownBy getInt(conf, name) } } @@ -127,7 +125,7 @@ class StormUtilSpec extends PropSpec with PropertyChecks with Matchers with Mock forAll(Gen.alphaStr) { (s: String) => conf.put(name, s) - an [IllegalArgumentException] should be thrownBy getBoolean(conf, name) + an[IllegalArgumentException] should be thrownBy getBoolean(conf, name) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala index 24cce69..8491786 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/util/TopologyUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala index 989cfaa..6618b48 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/Constants.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala index 6c38bde..af871ab 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/Command.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,11 +19,13 @@ package io.gearpump.experiments.yarn.appmaster import com.typesafe.config.Config + import io.gearpump.cluster.main.{Master, Worker} import io.gearpump.experiments.yarn.Constants._ import io.gearpump.transport.HostPort import io.gearpump.util.Constants +/** Command to start a YARN container */ trait Command { def get: String override def toString: String = get @@ -32,22 +34,26 @@ trait Command { abstract class AbstractCommand extends Command { protected def config: Config def version: String - def classPath = Array( - s"conf", - s"pack/$version/conf", - s"pack/$version/lib/daemon/*", - s"pack/$version/lib/*" - ) + def classPath: Array[String] = { + Array( + s"conf", + s"pack/$version/conf", + s"pack/$version/lib/daemon/*", + s"pack/$version/lib/*" + ) + } - protected def buildCommand(java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String]):String = { + protected def buildCommand( + java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String]) + : String = { val exe = config.getString(java) s"$exe -cp ${classPath.mkString(":")}:" + "$CLASSPATH " + properties.mkString(" ") + - s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr" + s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr" } - protected def clazz(any: AnyRef) : String = { + protected def clazz(any: AnyRef): String = { val name = any.getClass.getName if (name.endsWith("$")) { name.dropRight(1) @@ -57,10 +63,11 @@ abstract class AbstractCommand extends Command { } } -case class MasterCommand(config: Config, version: String, masterAddr: HostPort) extends AbstractCommand { +case class MasterCommand(config: Config, version: String, masterAddr: HostPort) + extends AbstractCommand { def get: String = { - val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}") + val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}") val properties = Array( s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", @@ -74,7 +81,8 @@ case class MasterCommand(config: Config, version: String, masterAddr: HostPort) } } -case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String) extends AbstractCommand { +case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String) + extends AbstractCommand { def get: String = { val properties = Array( @@ -85,11 +93,12 @@ case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}", s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost") - buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String]) + buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String]) } } -case class AppMasterCommand(config: Config, version: String, args: Array[String]) extends AbstractCommand { +case class AppMasterCommand(config: Config, version: String, args: Array[String]) + extends AbstractCommand { override val classPath = Array( "conf", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala index 92f85b6..6dd5011 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/UIService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,28 +18,30 @@ package io.gearpump.experiments.yarn.appmaster +import scala.concurrent.Future + import akka.actor._ import com.typesafe.config.{ConfigFactory, ConfigValueFactory} + import io.gearpump.cluster.ClusterConfig import io.gearpump.services.main.Services import io.gearpump.transport.HostPort import io.gearpump.util.{ActorUtil, Constants, LogUtil} -import scala.concurrent.Future - trait UIFactory { def props(masters: List[HostPort], host: String, port: Int): Props } +/** Wrapper of UI server */ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor { private val LOG = LogUtil.getLogger(getClass) private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path) private var configFile: java.io.File = null - implicit val dispatcher = context.dispatcher + private implicit val dispatcher = context.dispatcher - override def postStop: Unit = { + override def postStop(): Unit = { if (configFile != null) { configFile.delete() configFile = null @@ -51,22 +53,20 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor } override def preStart(): Unit = { - Future(start) + Future(start()) } - def start: Unit = { + def start(): Unit = { val mastersArg = masters.mkString(",") LOG.info(s"Launching services -master $mastersArg") configFile = java.io.File.createTempFile("uiserver", ".conf") - val config = context.system.settings.config. withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)). withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)). withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host)) - ClusterConfig.saveConfig(config, configFile) val master = masters.head @@ -75,9 +75,10 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor launch(supervisor, master.host, master.port, configFile.toString) } + // Launch the UI server def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort" - , "-conf", configFile)) + , "-conf", configFile)) } override def receive: Receive = { @@ -86,7 +87,7 @@ class UIService(masters: List[HostPort], host: String, port: Int) extends Actor } } -object UIService extends UIFactory{ +object UIService extends UIFactory { override def props(masters: List[HostPort], host: String, port: Int): Props = { Props(new UIService(masters, host, port)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala index f8982ce..1df7fb9 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,10 +20,17 @@ package io.gearpump.experiments.yarn.appmaster import java.io.IOException import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration import akka.actor._ import akka.util.Timeout import com.typesafe.config.ConfigValueFactory +import org.apache.commons.httpclient.HttpClient +import org.apache.commons.httpclient.methods.GetMethod +import org.slf4j.Logger + import io.gearpump.cluster.ClientToMaster._ import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} @@ -32,13 +39,16 @@ import io.gearpump.experiments.yarn.glue.Records._ import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig} import io.gearpump.transport.HostPort import io.gearpump.util._ -import org.apache.commons.httpclient.HttpClient -import org.apache.commons.httpclient.methods.GetMethod -import org.slf4j.Logger +/** + * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as + * YARN containers. + * + * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker. + */ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, - packagePath: String, hdfsConfDir: String, - uiFactory: UIFactory) + packagePath: String, hdfsConfDir: String, + uiFactory: UIFactory) extends Actor { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -47,11 +57,11 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, private var uiStarted = false private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) - val port = Util.findFreePort.get + private val port = Util.findFreePort().get private val trackingURL = "http://" + host + ":" + port - //TODO: for now, only one master is supported. + // TODO: for now, only one master is supported. private val masterCount = 1 private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt @@ -67,7 +77,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, def receive: Receive = null - private def registerAppMaster: Unit = { + private def registerAppMaster(): Unit = { val target = host + ":" + port rmClient.registerAppMaster(host, port, trackingURL) } @@ -75,7 +85,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, registerAppMaster context.become(waitForAppMasterRegistered) - import YarnAppMaster._ + import io.gearpump.experiments.yarn.appmaster.YarnAppMaster._ def waitForAppMasterRegistered: Receive = { case AppMasterRegistered => @@ -86,15 +96,16 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box { case ContainersAllocated(containers) => - LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: " + containers.size) + LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: " + + containers.size) val count = Math.min(containers.length, remain) - val newMasters = (0 until count).toList.map{index => + val newMasters = (0 until count).toList.map { index => val container = containers(index) MasterInfo(container.getId, container.getNodeId, launchMaster(container)) } - // stop un-used containers - containers.drop(count).map{container => + // Stops un-used containers + containers.drop(count).map { container => nmClient.stopContainer(container.getId, container.getNodeId) } @@ -113,26 +124,28 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, onError orElse receive orElse unHandled } - private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo]): Receive = + private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo]) + : Receive = { box { case ContainersAllocated(containers) => - LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), count: " + containers.size) + LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " + + s"count: " + containers.size) val count = Math.min(containers.length, remain) - val newWorkers = (0 until count).toList.map{index => + val newWorkers = (0 until count).toList.map { index => val container = containers(index) launchWorker(container, masters) WorkerInfo(container.getId, container.getNodeId) } - // stop un-used containers - containers.drop(count).map{container => + // Stops un-used containers + containers.drop(count).map { container => nmClient.stopContainer(container.getId, container.getNodeId) } context.become(startingWorkers(remain, masters, workers ++ newWorkers)) case ContainerStarted(containerId) => LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)") - // the last one + // The last one if (remain > 1) { context.become(startingWorkers(remain - 1, masters, workers)) } else { @@ -143,21 +156,22 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, context.become(service(effectiveConfig(masters.map(_.host)), masters, workers)) } } - - import scala.collection.JavaConversions._ + } private def effectiveConfig(masters: List[HostPort]): Config = { val masterList = masters.map(pair => s"${pair.host}:${pair.port}") val config = context.system.settings.config - config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(masterList)) + config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, + ConfigValueFactory.fromIterable(masterList.asJava)) } private def onError: Receive = { case ContainersCompleted(containers) => - //TODO: we should recover the failed container from this... + // TODO: we should recover the failed container from this... containers.foreach { status => if (status.getExitStatus != 0) { - LOG.error(s"ContainersCompleted: container ${status.getContainerId} failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}") + LOG.error(s"ContainersCompleted: container ${status.getContainerId}" + + s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}") } else { LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed") } @@ -178,22 +192,24 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, self ! ShutdownApplication } - private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo]): Receive = box { + private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo]) + : Receive = box { case GetActiveConfig(clientHost) => LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost) - val filtered = ClusterConfig.filterOutDefaultConfig(config.withValue(Constants.GEARPUMP_HOSTNAME, - ConfigValueFactory.fromAnyRef(clientHost))) + val filtered = ClusterConfig.filterOutDefaultConfig( + config.withValue(Constants.GEARPUMP_HOSTNAME, + ConfigValueFactory.fromAnyRef(clientHost))) sender ! ActiveConfig(filtered) case QueryVersion => LOG.info("QueryVersion") sender ! Version(Util.version) case QueryClusterInfo => LOG.info("QueryClusterInfo") - val masterContainers = masters.map{master => + val masterContainers = masters.map { master => master.id.toString + s"(${master.nodeId.toString})" } - val workerContainers = workers.map{worker=> + val workerContainers = workers.map { worker => worker.id.toString + s"(${worker.nodeId.toString})" } sender ! ClusterInfo(masterContainers, workerContainers) @@ -242,7 +258,7 @@ class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, LOG.info(s"Launch Master on container " + container.getNodeHttpAddress) val host = container.getNodeId.getHost - val port = Util.findFreePort.get + val port = Util.findFreePort().get LOG.info("=============PORT" + port) val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port)) @@ -297,7 +313,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser { nmClient, packagePath, confDir, UIService))) val daemon = system.actorOf(Props(new Daemon(appMaster))) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) LOG.info("YarnAppMaster is shutdown") } @@ -307,8 +323,9 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser { override def receive: Actor.Receive = { case Terminated(actor) => if (actor.compareTo(appMaster) == 0) { - LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, shutting down current ActorSystem") - context.system.shutdown() + LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " + + s"shutting down current ActorSystem") + context.system.terminate() context.stop(self) } } @@ -353,7 +370,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser { var status = client.executeMethod(get) if (status != 200) { - // sleep a little bit, and try again + // Sleeps a little bit, and try again Thread.sleep(3000) status = client.executeMethod(get) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala index 9f30570..49ec3a0 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +19,18 @@ package io.gearpump.experiments.yarn.client import java.io.IOException +import scala.util.Try import akka.actor.{ActorRef, ActorSystem} -import io.gearpump.experiments.yarn.glue.Records.ApplicationId -import io.gearpump.experiments.yarn.glue.YarnClient -import io.gearpump.util.{AkkaHelper, LogUtil} import org.apache.commons.httpclient.HttpClient import org.apache.commons.httpclient.methods.GetMethod -import org.slf4j.Logger -import scala.util.Try +import io.gearpump.experiments.yarn.glue.Records.ApplicationId +import io.gearpump.experiments.yarn.glue.YarnClient +import io.gearpump.util.{AkkaHelper, LogUtil} /** - * Resolve AppMaster ActorRef + * Resolves AppMaster ActorRef */ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { val LOG = LogUtil.getLogger(getClass) @@ -67,7 +66,8 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { index += 1 val tryConnect = Try(fun) if (tryConnect.isFailure) { - Console.err.println(s"Failed to connect YarnAppMaster(tried $index)... " + tryConnect.failed.get.getMessage) + LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " + + tryConnect.failed.get.getMessage) } else { result = tryConnect.get } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala index 2c3f99e..7c3bc38 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/Client.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,10 +16,12 @@ * limitations under the License. */ package io.gearpump.experiments.yarn.client -import io.gearpump.util.LogUtil import org.slf4j.Logger -object Client { +import io.gearpump.util.LogUtil + +/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */ +object Client { private val LOG: Logger = LogUtil.getLogger(getClass) val LAUNCH = "launch" @@ -27,14 +29,16 @@ object Client { val commands = Map(LAUNCH -> LaunchCluster) ++ ManageCluster.commands.map(key => (key, ManageCluster)).toMap - def usage: Unit = { + def usage(): Unit = { val keys = commands.keys.toList.sorted + // scalastyle:off println Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println } - def main(args: Array[String]) = { + def main(args: Array[String]): Unit = { if (args.length == 0) { - usage + usage() } else { val key = args(0) val command = commands.get(key) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala index 0ff4f47..ea5e707 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/LaunchCluster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,9 +20,13 @@ package io.gearpump.experiments.yarn.client import java.io.{File, IOException, OutputStreamWriter} import java.net.InetAddress import java.util.zip.ZipInputStream -import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + import akka.actor.ActorSystem import com.typesafe.config.{Config, ConfigValueFactory} +import org.slf4j.Logger + import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import io.gearpump.experiments.yarn @@ -33,13 +37,9 @@ import io.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource} import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig} import io.gearpump.util.ActorUtil.askActor import io.gearpump.util.{AkkaApp, LogUtil, Util} -import org.slf4j.Logger - -import scala.concurrent.Await /** * Launch Gearpump on YARN - * */ class LaunchCluster( akka: Config, @@ -50,7 +50,7 @@ class LaunchCluster( appMasterResolver: AppMasterResolver, version: String = Util.version) { - import yarn.Constants._ + import io.gearpump.experiments.yarn.Constants._ private implicit val dispatcher = actorSystem.dispatcher private val LOG: Logger = LogUtil.getLogger(getClass) @@ -59,13 +59,13 @@ class LaunchCluster( private val memory = akka.getString(APPMASTER_MEMORY).toInt private val vcore = akka.getString(APPMASTER_VCORES).toInt - def submit(appName: String, packagePath: String): ApplicationId = { LOG.info("Starting AM") - // first step, check the version, to make sure local version matches remote version + // First step, check the version, to make sure local version matches remote version if (!packagePath.endsWith(".zip")) { - throw new IOException(s"YarnClient only support .zip distribution package, now it is ${packagePath}. Please download the zip " + + throw new IOException(s"YarnClient only support .zip distribution package," + + s" now it is ${packagePath}. Please download the zip " + "package from website or use sbt assembly packArchiveZip to build one.") } @@ -76,24 +76,30 @@ class LaunchCluster( val rootEntry = rootEntryPath(zip = packagePath) if (!rootEntry.contains(version)) { - throw new IOException(s"Check version failed! Local gearpump binary version $version doesn't match with remote path $packagePath") + throw new IOException(s"Check version failed! Local gearpump binary" + + s" version $version doesn't match with remote path $packagePath") } val resource = Resource.newInstance(memory, vcore) val appId = yarnClient.createApplication - // will upload the configs to HDFS home directory of current user. + // uploads the configs to HDFS home directory of current user. val configPath = uploadConfigToHDFS(appId) - val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath", s"-package $packagePath")) + val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath", + s"-package $packagePath")) yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath) LOG.info("Waiting application to finish...") val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS) - LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} at ${report.getFinishTime}, info: ${report.getDiagnostics}") + LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " + + s"at ${report.getFinishTime}, info: ${report.getDiagnostics}") + + // scalastyle:off println Console.println("================================================") Console.println("==Application Id: " + appId) + // scalastyle:on println appId } @@ -103,21 +109,20 @@ class LaunchCluster( val appMaster = appMasterResolver.resolve(appId) LOG.info(s"appMaster=${appMaster.path} host=$host") val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config) - import scala.concurrent.duration._ - future.map{ config => - val out = new File(output) + future.map { config => + val out = new File(output) ClusterConfig.saveConfig(config, out) out } } private def uploadConfigToHDFS(appId: ApplicationId): String = { - // will use personal home directory so that it will not conflict with other users + // Uses personal home directory so that it will not conflict with other users // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/" LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...") - // copy config from local to remote. + // Copies config from local to remote. val remoteConfFile = s"$confDir/gear.conf" var out = fs.create(remoteConfFile) var writer = new OutputStreamWriter(out) @@ -127,14 +132,14 @@ class LaunchCluster( writer.write(cleanedConfig.root().render()) writer.close() - // save yarn-site.xml to remote + // Saves yarn-site.xml to remote val yarn_site_xml = s"$confDir/yarn-site.xml" out = fs.create(yarn_site_xml) writer = new OutputStreamWriter(out) yarnConf.writeXml(writer) writer.close() - // save log4j.properties to remote + // Saves log4j.properties to remote val log4j_properties = s"$confDir/log4j.properties" val log4j = LogUtil.loadConfiguration out = fs.create(log4j_properties) @@ -164,17 +169,21 @@ object LaunchCluster extends AkkaApp with ArgumentsParser { } override val options: Array[(String, CLIOption[Any])] = Array( - PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false), - NAME -> CLIOption[String]("<Application name showed in YARN>", required = false, defaultValue = Some("Gearpump")), - VERBOSE -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)), - OUTPUT -> CLIOption("<output path for configuration file>", required = false, defaultValue = None) + PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " + + "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false), + NAME -> CLIOption[String]("<Application name showed in YARN>", required = false, + defaultValue = Some("Gearpump")), + VERBOSE -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), + OUTPUT -> CLIOption("<output path for configuration file>", required = false, + defaultValue = None) ) private val TIMEOUT_MILLISECONDS = 30 * 1000 override def main(inputAkkaConf: Config, args: Array[String]): Unit = { val parsed = parse(args) if (parsed.getBoolean(VERBOSE)) { - LogUtil.verboseLogToConsole + LogUtil.verboseLogToConsole() } val yarnConfig = new YarnConfig() @@ -184,24 +193,27 @@ object LaunchCluster extends AkkaApp with ArgumentsParser { val actorSystem = ActorSystem("launchCluster", akkaConf) val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem) - val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs, actorSystem, appMasterResolver) + val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs, + actorSystem, appMasterResolver) val name = parsed.getString(NAME) val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH)) if (parsed.exists(OUTPUT)) { import scala.concurrent.duration._ - Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)), TIMEOUT_MILLISECONDS milliseconds) + Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)), + TIMEOUT_MILLISECONDS.milliseconds) } - yarnClient.stop - actorSystem.shutdown() - actorSystem.awaitTermination() + yarnClient.stop() + actorSystem.terminate() + Await.result(actorSystem.whenTerminated, Duration.Inf) } private def updateConf(akka: Config, parsed: ParseResult): Config = { if (parsed.exists(PACKAGE)) { - akka.withValue(Constants.PACKAGE_PATH, ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE))) + akka.withValue(Constants.PACKAGE_PATH, + ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE))) } else { akka } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala index 20fd13e..3e50abe 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,8 +20,10 @@ package io.gearpump.experiments.yarn.client import java.io.{File, IOException} import java.net.InetAddress +import scala.concurrent.{Await, Future} import akka.actor.{ActorRef, ActorSystem} + import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} @@ -31,18 +33,23 @@ import io.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig} import io.gearpump.util.ActorUtil.askActor import io.gearpump.util.{AkkaApp, LogUtil} -import scala.concurrent.{Await, Future} - +/** Manage current Gearpump cluster on YARN */ class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) { - import ManageCluster._ + import io.gearpump.experiments.yarn.client.ManageCluster._ private val host = InetAddress.getLocalHost.getHostName implicit val dispatcher = system.dispatcher def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host)) def version: Future[Version] = askActor[Version](appMaster, QueryVersion) - def addWorker(count: Int): Future[CommandResult] = askActor[CommandResult](appMaster, AddWorker(count)) - def removeWorker(worker: String): Future[CommandResult] = askActor[CommandResult](appMaster, RemoveWorker(worker)) + def addWorker(count: Int): Future[CommandResult] = { + askActor[CommandResult](appMaster, AddWorker(count)) + } + + def removeWorker(worker: String): Future[CommandResult] = { + askActor[CommandResult](appMaster, RemoveWorker(worker)) + } + def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill) def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo) @@ -91,18 +98,23 @@ object ManageCluster extends AkkaApp with ArgumentsParser { val APPID = "appid" val VERBOSE = "verbose" - val commands = List(GET_CONFIG,ADD_WORKER, REMOVE_WORKER, KILL,VERSION, QUERY) + val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY) import scala.concurrent.duration._ - val TIME_OUT_SECONDS = 30 seconds + val TIME_OUT_SECONDS = 30.seconds override val options: Array[(String, CLIOption[Any])] = Array( COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true), - APPID ->CLIOption[String]("<Application id, format: application_timestamp_id>", required = true), - COUNT -> CLIOption("<how many instance to add or remove>", required = false, defaultValue = Some(1)), - VERBOSE -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)), - OUTPUT -> CLIOption("<output path for configuration file>", required = false, defaultValue = Some("")), - CONTAINER -> CLIOption("<container id for master or worker>", required = false, defaultValue = Some("")) + APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>", + required = true), + COUNT -> CLIOption("<how many instance to add or remove>", required = false, + defaultValue = Some(1)), + VERBOSE -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), + OUTPUT -> CLIOption("<output path for configuration file>", required = false, + defaultValue = Some("")), + CONTAINER -> CLIOption("<container id for master or worker>", required = false, + defaultValue = Some("")) ) override def main(akkaConf: Config, args: Array[String]): Unit = { @@ -113,7 +125,7 @@ object ManageCluster extends AkkaApp with ArgumentsParser { val parsed = parse(args) if (parsed.getBoolean(VERBOSE)) { - LogUtil.verboseLogToConsole + LogUtil.verboseLogToConsole() } val appId = parseAppId(parsed.getString(APPID)) @@ -128,9 +140,11 @@ object ManageCluster extends AkkaApp with ArgumentsParser { val command = parsed.getString(COMMAND) val result = manager.command(command, parsed) + // scalastyle:off println Console.println(Await.result(result, TIME_OUT_SECONDS)) - system.shutdown() - system.awaitTermination() + // scalastyle:on println + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) } def parseAppId(str: String): ApplicationId = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala index 93cee28..6b3385f 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala @@ -1,4 +1,4 @@ -/*- +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,8 +20,8 @@ package io.gearpump.experiments.yarn.glue import java.io.File import java.nio.ByteBuffer +import scala.collection.JavaConverters._ -import io.gearpump.util.LogUtil import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path} import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.mapreduce.security.TokenCache @@ -32,18 +32,19 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.slf4j.Logger -import scala.collection.JavaConversions._ +import io.gearpump.util.LogUtil -private [glue] +private[glue] object ContainerLaunchContext { private val LOG: Logger = LogUtil.getLogger(getClass) - def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String): ContainerLaunchContext = { + def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String) + : ContainerLaunchContext = { val context = Records.newRecord(classOf[ContainerLaunchContext]) - context.setCommands(Seq(command)) - context.setEnvironment(getAppEnv(yarnConf)) + context.setCommands(Seq(command).asJava) + context.setEnvironment(getAppEnv(yarnConf).asJava) context.setTokens(getToken(yarnConf, packagePath, configPath)) - context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath)) + context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava) context } @@ -60,7 +61,9 @@ object ContainerLaunchContext { Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator)) } - private def getAMLocalResourcesMap(yarnConf: YarnConfiguration, packagePath: String, configPath: String): Map[String, LocalResource] = { + private def getAMLocalResourcesMap( + yarnConf: YarnConfiguration, packagePath: String, configPath: String) + : Map[String, LocalResource] = { val fs = getFs(yarnConf) Map( @@ -70,8 +73,9 @@ object ContainerLaunchContext { LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)) } - private def newYarnAppResource(fs: YarnFileSystem, path: Path, - resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = { + private def newYarnAppResource( + fs: YarnFileSystem, path: Path, + resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = { val qualified = fs.makeQualified(path) val status = fs.getFileStatus(qualified) val resource = Records.newRecord(classOf[LocalResource]) @@ -83,7 +87,8 @@ object ContainerLaunchContext { resource } - private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String): ByteBuffer = { + private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String) + : ByteBuffer = { val credentials = UserGroupInformation.getCurrentUser.getCredentials val dob = new DataOutputBuffer val dirs = Array(new Path(packagePath), new Path(configPath)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala index e3dfb7f..acf09ac 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,11 +20,11 @@ package io.gearpump.experiments.yarn.glue import java.io.{InputStream, OutputStream} import java.net.ConnectException +import scala.util.{Failure, Success, Try} -import io.gearpump.util.LogUtil import org.apache.hadoop.fs.Path -import scala.util.{Success, Failure, Try} +import io.gearpump.util.LogUtil class FileSystem(yarnConfig: YarnConfig) { @@ -33,26 +33,26 @@ class FileSystem(yarnConfig: YarnConfig) { private def LOG = LogUtil.getLogger(getClass) - def open(file: String): InputStream = exceptionHandler{ + def open(file: String): InputStream = exceptionHandler { val path = new Path(file) fs.open(path) } - def create(file: String): OutputStream = exceptionHandler{ + def create(file: String): OutputStream = exceptionHandler { val path = new Path(file) fs.create(path) } - def exists(file: String): Boolean = exceptionHandler{ + def exists(file: String): Boolean = exceptionHandler { val path = new Path(file) fs.exists(path) } def name: String = { - fs.getName + fs.getUri.toString } - def getHomeDirectory: String = { + def getHomeDirectory: String = { fs.getHomeDirectory.toString } @@ -62,8 +62,10 @@ class FileSystem(yarnConfig: YarnConfig) { case Success(v) => v case Failure(ex) => if (ex.isInstanceOf[ConnectException]) { - LOG.error("Please check whether we connect to the right HDFS file system, current file system is $name." + - "\n. Please copy all configs under $HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, so that we can use the right File system.", ex) + LOG.error("Please check whether we connect to the right HDFS file system, " + + "current file system is $name." + "\n. Please copy all configs under " + + "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " + + "so that we can use the right File system.", ex) } throw ex } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala index f0a5fc7..3e7e668 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,12 +22,13 @@ import java.nio.ByteBuffer import akka.actor.ActorRef import com.typesafe.config.Config -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.util.LogUtil import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource} import org.apache.hadoop.yarn.client.api.async.NMClientAsync import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl + +import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted +import io.gearpump.experiments.yarn.glue.Records._ +import io.gearpump.util.LogUtil /** * Adapter for node manager client */ @@ -46,45 +47,49 @@ class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.Callb client.start() } - private [glue] - override def onContainerStarted(containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) { + private[glue] + override def onContainerStarted( + containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) { LOG.info(s"Container started : $containerId, " + allServiceResponse) reportTo ! ContainerStarted(containerId) } - private [glue] - override def onContainerStatusReceived(containerId: YarnContainerId, containerStatus: YarnContainerStatus) { + private[glue] + override def onContainerStatusReceived( + containerId: YarnContainerId, containerStatus: YarnContainerStatus) { LOG.info(s"Container status received : $containerId, status $containerStatus") } - private [glue] + private[glue] override def onContainerStopped(containerId: YarnContainerId) { LOG.error(s"Container stopped : $containerId") } - private [glue] + private[glue] override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) { LOG.error(s"Container exception : $containerId", throwable) } - private [glue] + private[glue] override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) { LOG.error(s"Container exception : $containerId", throwable) } - private [glue] + private[glue] override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) { LOG.error(s"Container exception : $containerId", throwable) } - def launchCommand(container: Container, command: String, packagePath: String, configPath: String): Unit = { - LOG.info(s"Launching command : $command on container: ${container.getId}, host ip : ${container.getNodeId.getHost}") + def launchCommand( + container: Container, command: String, packagePath: String, configPath: String): Unit = { + LOG.info(s"Launching command : $command on container" + + s": ${container.getId}, host ip : ${container.getNodeId.getHost}") val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath) client.startContainerAsync(container, context) } def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = { - LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} " ) + LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ") client.stopContainerAsync(containerId, nodeId) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala index c1d26f6..7b9d83c 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,17 @@ package io.gearpump.experiments.yarn.glue +import scala.collection.JavaConverters._ + import akka.actor.ActorRef -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication} -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.util.LogUtil import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource} import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync import org.slf4j.Logger -import scala.collection.JavaConverters._ +import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication} +import io.gearpump.experiments.yarn.glue.Records._ +import io.gearpump.util.LogUtil /** * Adapter for resource manager client @@ -46,7 +47,7 @@ class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler { } private def startAMRMClient: AMRMClientAsync[ContainerRequest] = { - val timeIntervalMs = 1000 //ms + val timeIntervalMs = 1000 // ms val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this) amrmClient.init(yarnConf.conf) amrmClient.start() @@ -59,16 +60,19 @@ class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler { private[glue] override def onContainersAllocated(containers: java.util.List[YarnContainer]) { - val newContainers = containers.asScala.toList.filterNot(container => allocatedContainers.contains(container.getId)) + val newContainers = containers.asScala.toList.filterNot(container => + allocatedContainers.contains(container.getId)) allocatedContainers ++= newContainers.map(_.getId) LOG.info(s"New allocated ${newContainers.size} containers") reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_))) } private[glue] - override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus]): Unit = { + override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus]) + : Unit = { LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}") - reportTo ! ContainersCompleted(completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_))) + reportTo ! ContainersCompleted( + completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_))) } private[glue]
