Repository: incubator-gearpump Updated Branches: refs/heads/master 882ea9aa8 -> a7ae62a12
fix GEARPUMP-155, DagManager serializes DAG before sending it to master Changes include: 1. DagManager serializes from DAG before saving to master and deserializes to DAG when loading from master 2. remove conflict Storm dependencies Author: manuzhang <[email protected]> Closes #38 from manuzhang/GEARPUMP-155. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a7ae62a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a7ae62a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a7ae62a1 Branch: refs/heads/master Commit: a7ae62a12dce2ce150cdd9d426e8061955367444 Parents: 882ea9a Author: manuzhang <[email protected]> Authored: Sun Jun 12 14:06:28 2016 +0800 Committer: manuzhang <[email protected]> Committed: Sun Jun 12 14:06:28 2016 +0800 ---------------------------------------------------------------------- project/Build.scala | 1 + .../streaming/appmaster/DagManager.scala | 17 ++++++---- .../streaming/appmaster/ClockServiceSpec.scala | 10 +++--- .../streaming/appmaster/DagManagerSpec.scala | 35 +++++++++++--------- 4 files changed, 36 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 82f4e8f..c9f6356 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -416,6 +416,7 @@ object Build extends sbt.Build { exclude("org.mortbay.jetty", "jetty") exclude("org.ow2.asm", "asm") exclude("org.slf4j", "log4j-over-slf4j") + exclude("org.apache.logging.log4j", "log4j-slf4j-impl") exclude("ring", "ring-core") exclude("ring", "ring-devel") exclude("ring", "ring-jetty-adapter") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala index 24f16ad..d6703e4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala @@ -18,9 +18,10 @@ package org.apache.gearpump.streaming.appmaster -import akka.actor.{Actor, ActorRef, Stash} +import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.DagManager._ import org.apache.gearpump.streaming.storage.AppDataStore @@ -48,13 +49,15 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O private implicit val system = context.system private var watchers = List.empty[ActorRef] + private val serializer = new KryoSerializerWrapper(system.asInstanceOf[ExtendedActorSystem]) override def receive: Receive = null override def preStart(): Unit = { LOG.info("Initializing Dag Service, get stored Dag ....") - store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag => - if (storedDag != null) { + store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].map { bytes => + if (bytes != null) { + val storedDag = serializer.fromBinary(bytes).asInstanceOf[DAG] dags :+= storedDag } else { dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription, @@ -62,7 +65,7 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O } maxProcessorId = { val keys = dags.head.processors.keys - if (keys.size == 0) { + if (keys.isEmpty) { 0 } else { keys.max @@ -96,11 +99,11 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O case GetLatestDAG => // Get the latest version of DAG. sender ! LatestDAG(dags.last) - case GetTaskLaunchData(version, processorId, context) => + case GetTaskLaunchData(version, processorId, launchContext) => // Task information like Processor class, downstream subscriber processors and etc. dags.find(_.version == version).foreach { dag => LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version") - sender ! taskLaunchData(dag, processorId, context) + sender ! taskLaunchData(dag, processorId, launchContext) } case ReplaceProcessor(oldProcessorId, inputNewProcessor, inheritConfig) => // Replace a processor with new implementation. The upstream processors and downstream @@ -142,7 +145,7 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O // deployed. The obsolete dag versions will be removed. if (dagVersion != NOT_INITIALIZED) { dags = dags.filter(_.version == dagVersion) - store.put(StreamApplication.DAG, dags.last) + store.put(StreamApplication.DAG, serializer.toBinary(dags.last)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index 2bb33b7..0729877 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -42,7 +42,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val task2 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) val dag = DAG(Graph(task1 ~ hash ~> task2)) - override def afterAll { + override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } @@ -102,8 +102,8 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref) val clocks = user.expectMsgPF() { - case ChangeToNewDAGSuccess(clocks) => - clocks + case ChangeToNewDAGSuccess(newDagClocks) => + newDagClocks } // For intermediate task, pick its upstream as initial clock @@ -129,7 +129,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli clockService ! GetStartClock expectMsg(StartClock(200L)) - val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true) + val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", value = true) val task3 = ProcessorDescription(id = 3, taskClass = classOf[TaskActor].getName, parallelism = 1, taskConf = conf) val task4 = ProcessorDescription(id = 4, taskClass = classOf[TaskActor].getName, @@ -234,7 +234,7 @@ object ClockServiceSpec { } def get(key: String): Future[Any] = { - Promise.successful(map.get(key).getOrElse(null)).future + Promise.successful(map.getOrElse(key, null)).future } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala index 2caab4f..a8d1ff5 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -18,21 +18,26 @@ package org.apache.gearpump.streaming.appmaster -import akka.actor.{ActorSystem, Props} +import java.util.concurrent.TimeUnit + +import akka.actor.{ExtendedActorSystem, ActorSystem, Props} import akka.testkit.TestProbe import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} import org.apache.gearpump.streaming.task.{Subscriber, TaskActor} -import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, StreamApplication} +import org.apache.gearpump.streaming._ import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -import scala.concurrent.Await -import scala.concurrent.duration.Duration +import scala.concurrent.{Promise, Future, Await} +import scala.concurrent.duration.{FiniteDuration, Duration} -class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { +class DagManagerSpec extends WordSpecLike with Matchers with MockitoSugar with BeforeAndAfterAll { val hash = Partitioner[HashPartitioner] val task1 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1) @@ -65,43 +70,43 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { client.send(dagManager, WatchChange(watcher.ref)) val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue)) - client.send(dagManager, ReplaceProcessor(task2.id, task3, false)) + client.send(dagManager, ReplaceProcessor(task2.id, task3, inheritConf = false)) client.expectMsg(DAGOperationSuccess) client.send(dagManager, GetLatestDAG) val newDag = client.expectMsgPF() { - case LatestDAG(dag) => dag + case LatestDAG(latestDag) => latestDag } assert(newDag.processors.contains(task3.id)) watcher.expectMsgType[LatestDAG] val task4 = task3.copy(id = 4) - client.send(dagManager, ReplaceProcessor(task3.id, task4, false)) + client.send(dagManager, ReplaceProcessor(task3.id, task4, inheritConf = false)) client.expectMsgType[DAGOperationFailed] client.send(dagManager, NewDAGDeployed(newDag.version)) - client.send(dagManager, ReplaceProcessor(task3.id, task4, false)) + client.send(dagManager, ReplaceProcessor(task3.id, task4, inheritConf = false)) client.expectMsg(DAGOperationSuccess) } "retrieve last stored dag properly" in { val store = new Store - val newGraph = Graph(task1 ~ hash ~> task2 ~> task2) + val newGraph = Graph(task1 ~ hash ~> task2) val newDag = DAG(newGraph) - store.put(StreamApplication.DAG, newDag) - val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(dag)))) + val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, store, Some(newDag)))) + dagManager ! NewDAGDeployed(0) val client = TestProbe() client.send(dagManager, GetLatestDAG) - client.expectMsg(LatestDAG(newDag)) + client.expectMsgType[LatestDAG].dag shouldBe newDag } } - override def afterAll { + override def afterAll(): Unit = { system.terminate() Await.result(system.whenTerminated, Duration.Inf) } - override def beforeAll { + override def beforeAll(): Unit = { this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG) } }
