Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 882ea9aa8 -> a7ae62a12


fix GEARPUMP-155, DagManager serializes DAG before sending it to master

Changes include:

1. DagManager serializes from DAG before saving to master and deserializes to 
DAG when loading from master
2. remove conflict Storm dependencies

Author: manuzhang <[email protected]>

Closes #38 from manuzhang/GEARPUMP-155.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a7ae62a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a7ae62a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a7ae62a1

Branch: refs/heads/master
Commit: a7ae62a12dce2ce150cdd9d426e8061955367444
Parents: 882ea9a
Author: manuzhang <[email protected]>
Authored: Sun Jun 12 14:06:28 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Sun Jun 12 14:06:28 2016 +0800

----------------------------------------------------------------------
 project/Build.scala                             |  1 +
 .../streaming/appmaster/DagManager.scala        | 17 ++++++----
 .../streaming/appmaster/ClockServiceSpec.scala  | 10 +++---
 .../streaming/appmaster/DagManagerSpec.scala    | 35 +++++++++++---------
 4 files changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 82f4e8f..c9f6356 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -416,6 +416,7 @@ object Build extends sbt.Build {
             exclude("org.mortbay.jetty", "jetty")
             exclude("org.ow2.asm", "asm")
             exclude("org.slf4j", "log4j-over-slf4j")
+            exclude("org.apache.logging.log4j", "log4j-slf4j-impl")
             exclude("ring", "ring-core")
             exclude("ring", "ring-devel")
             exclude("ring", "ring-jetty-adapter")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
index 24f16ad..d6703e4 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
@@ -18,9 +18,10 @@
 
 package org.apache.gearpump.streaming.appmaster
 
-import akka.actor.{Actor, ActorRef, Stash}
+import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper
 import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.DagManager._
 import org.apache.gearpump.streaming.storage.AppDataStore
@@ -48,13 +49,15 @@ class DagManager(appId: Int, userConfig: UserConfig, store: 
AppDataStore, dag: O
   private implicit val system = context.system
 
   private var watchers = List.empty[ActorRef]
+  private val serializer = new 
KryoSerializerWrapper(system.asInstanceOf[ExtendedActorSystem])
 
   override def receive: Receive = null
 
   override def preStart(): Unit = {
     LOG.info("Initializing Dag Service, get stored Dag ....")
-    store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag 
=>
-      if (storedDag != null) {
+    store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].map { 
bytes =>
+      if (bytes != null) {
+        val storedDag = serializer.fromBinary(bytes).asInstanceOf[DAG]
         dags :+= storedDag
       } else {
         dags :+= 
dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
@@ -62,7 +65,7 @@ class DagManager(appId: Int, userConfig: UserConfig, store: 
AppDataStore, dag: O
       }
       maxProcessorId = {
         val keys = dags.head.processors.keys
-        if (keys.size == 0) {
+        if (keys.isEmpty) {
           0
         } else {
           keys.max
@@ -96,11 +99,11 @@ class DagManager(appId: Int, userConfig: UserConfig, store: 
AppDataStore, dag: O
     case GetLatestDAG =>
       // Get the latest version of DAG.
       sender ! LatestDAG(dags.last)
-    case GetTaskLaunchData(version, processorId, context) =>
+    case GetTaskLaunchData(version, processorId, launchContext) =>
       // Task information like Processor class, downstream subscriber 
processors and etc.
       dags.find(_.version == version).foreach { dag =>
         LOG.info(s"Get task launcher data for processor: $processorId, 
dagVersion: $version")
-        sender ! taskLaunchData(dag, processorId, context)
+        sender ! taskLaunchData(dag, processorId, launchContext)
       }
     case ReplaceProcessor(oldProcessorId, inputNewProcessor, inheritConfig) =>
       // Replace a processor with new implementation. The upstream processors 
and downstream
@@ -142,7 +145,7 @@ class DagManager(appId: Int, userConfig: UserConfig, store: 
AppDataStore, dag: O
       // deployed. The obsolete dag versions will be removed.
       if (dagVersion != NOT_INITIALIZED) {
         dags = dags.filter(_.version == dagVersion)
-        store.put(StreamApplication.DAG, dags.last)
+        store.put(StreamApplication.DAG, serializer.toBinary(dags.last))
       }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index 2bb33b7..0729877 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -42,7 +42,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
   val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
   val dag = DAG(Graph(task1 ~ hash ~> task2))
 
-  override def afterAll {
+  override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
   }
 
@@ -102,8 +102,8 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       clockService.tell(ChangeToNewDAG(dagAddMiddleNode), user.ref)
 
       val clocks = user.expectMsgPF() {
-        case ChangeToNewDAGSuccess(clocks) =>
-          clocks
+        case ChangeToNewDAGSuccess(newDagClocks) =>
+          newDagClocks
       }
 
       // For intermediate task, pick its upstream as initial clock
@@ -129,7 +129,7 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       clockService ! GetStartClock
       expectMsg(StartClock(200L))
 
-      val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", true)
+      val conf = UserConfig.empty.withBoolean("state.checkpoint.enable", value 
= true)
       val task3 = ProcessorDescription(id = 3, taskClass = 
classOf[TaskActor].getName,
         parallelism = 1, taskConf = conf)
       val task4 = ProcessorDescription(id = 4, taskClass = 
classOf[TaskActor].getName,
@@ -234,7 +234,7 @@ object ClockServiceSpec {
     }
 
     def get(key: String): Future[Any] = {
-      Promise.successful(map.get(key).getOrElse(null)).future
+      Promise.successful(map.getOrElse(key, null)).future
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a7ae62a1/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index 2caab4f..a8d1ff5 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -18,21 +18,26 @@
 
 package org.apache.gearpump.streaming.appmaster
 
-import akka.actor.{ActorSystem, Props}
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ExtendedActorSystem, ActorSystem, Props}
 import akka.testkit.TestProbe
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper
 import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, 
DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, 
NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
 import org.apache.gearpump.streaming.task.{Subscriber, TaskActor}
-import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, 
StreamApplication}
+import org.apache.gearpump.streaming._
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
+import scala.concurrent.{Promise, Future, Await}
+import scala.concurrent.duration.{FiniteDuration, Duration}
 
-class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll 
{
+class DagManagerSpec extends WordSpecLike with Matchers with MockitoSugar with 
BeforeAndAfterAll {
 
   val hash = Partitioner[HashPartitioner]
   val task1 = ProcessorDescription(id = 1, taskClass = 
classOf[TaskActor].getName, parallelism = 1)
@@ -65,43 +70,43 @@ class DagManagerSpec extends WordSpecLike with Matchers 
with BeforeAndAfterAll {
       client.send(dagManager, WatchChange(watcher.ref))
       val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue))
 
-      client.send(dagManager, ReplaceProcessor(task2.id, task3, false))
+      client.send(dagManager, ReplaceProcessor(task2.id, task3, inheritConf = 
false))
       client.expectMsg(DAGOperationSuccess)
 
       client.send(dagManager, GetLatestDAG)
       val newDag = client.expectMsgPF() {
-        case LatestDAG(dag) => dag
+        case LatestDAG(latestDag) => latestDag
       }
       assert(newDag.processors.contains(task3.id))
       watcher.expectMsgType[LatestDAG]
 
       val task4 = task3.copy(id = 4)
-      client.send(dagManager, ReplaceProcessor(task3.id, task4, false))
+      client.send(dagManager, ReplaceProcessor(task3.id, task4, inheritConf = 
false))
       client.expectMsgType[DAGOperationFailed]
 
       client.send(dagManager, NewDAGDeployed(newDag.version))
-      client.send(dagManager, ReplaceProcessor(task3.id, task4, false))
+      client.send(dagManager, ReplaceProcessor(task3.id, task4, inheritConf = 
false))
       client.expectMsg(DAGOperationSuccess)
     }
 
     "retrieve last stored dag properly" in {
       val store = new Store
-      val newGraph = Graph(task1 ~ hash ~> task2 ~> task2)
+      val newGraph = Graph(task1 ~ hash ~> task2)
       val newDag = DAG(newGraph)
-      store.put(StreamApplication.DAG, newDag)
-      val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, 
store, Some(dag))))
+      val dagManager = system.actorOf(Props(new DagManager(appId, userConfig, 
store, Some(newDag))))
+      dagManager ! NewDAGDeployed(0)
       val client = TestProbe()
       client.send(dagManager, GetLatestDAG)
-      client.expectMsg(LatestDAG(newDag))
+      client.expectMsgType[LatestDAG].dag shouldBe newDag
     }
   }
 
-  override def afterAll {
+  override def afterAll(): Unit = {
     system.terminate()
     Await.result(system.whenTerminated, Duration.Inf)
   }
 
-  override def beforeAll {
+  override def beforeAll(): Unit = {
     this.system = ActorSystem("DagManagerSpec", TestUtil.DEFAULT_CONFIG)
   }
 }

Reply via email to