Repository: incubator-gearpump
Updated Branches:
  refs/heads/master baa7dead9 -> 8939a2d42


fix GEARPUMP-155, integration test failure

Author: manuzhang <[email protected]>

Closes #40 from manuzhang/it_failure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/8939a2d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/8939a2d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/8939a2d4

Branch: refs/heads/master
Commit: 8939a2d42138065be9c760fb94807bf77f0198a2
Parents: baa7dea
Author: manuzhang <[email protected]>
Authored: Thu Jun 16 09:21:10 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Jun 16 09:21:10 2016 +0800

----------------------------------------------------------------------
 .../gearpump/integrationtest/storm/StormClient.scala     |  4 ++--
 .../apache/gearpump/streaming/appmaster/DagManager.scala |  6 +++---
 .../gearpump/streaming/appmaster/DagManagerSpec.scala    | 11 ++++-------
 3 files changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8939a2d4/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
index 79adfc4..5a58782 100644
--- 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
+++ 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
@@ -68,9 +68,9 @@ class StormClient(cluster: MiniCluster, restClient: 
RestClient) {
   }
 
   def submitStormApp(jar: String, mainClass: String, args: String, appName: 
String): Int = {
+    Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+      s"-jar $jar $mainClass $args")
     Util.retryUntil(() => {
-      Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
-        s"-jar $jar $mainClass $args")
       restClient.listRunningApps().exists(_.appName == appName)
     }, "app running")
     restClient.listRunningApps().filter(_.appName == appName).head.appId

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8939a2d4/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
index d6703e4..2736f5e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
@@ -19,9 +19,9 @@
 package org.apache.gearpump.streaming.appmaster
 
 import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash}
+import akka.serialization.JavaSerializer
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper
 import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.DagManager._
 import org.apache.gearpump.streaming.storage.AppDataStore
@@ -49,13 +49,13 @@ class DagManager(appId: Int, userConfig: UserConfig, store: 
AppDataStore, dag: O
   private implicit val system = context.system
 
   private var watchers = List.empty[ActorRef]
-  private val serializer = new 
KryoSerializerWrapper(system.asInstanceOf[ExtendedActorSystem])
+  private val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
 
   override def receive: Receive = null
 
   override def preStart(): Unit = {
     LOG.info("Initializing Dag Service, get stored Dag ....")
-    store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].map { 
bytes =>
+    
store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].foreach{ 
bytes =>
       if (bytes != null) {
         val storedDag = serializer.fromBinary(bytes).asInstanceOf[DAG]
         dags :+= storedDag

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8939a2d4/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index a8d1ff5..be3b3b7 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -18,26 +18,23 @@
 
 package org.apache.gearpump.streaming.appmaster
 
-import java.util.concurrent.TimeUnit
 
-import akka.actor.{ExtendedActorSystem, ActorSystem, Props}
+import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
-import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper
 import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, 
DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, 
NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
 import org.apache.gearpump.streaming.task.{Subscriber, TaskActor}
 import org.apache.gearpump.streaming._
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
-import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
-import scala.concurrent.{Promise, Future, Await}
-import scala.concurrent.duration.{FiniteDuration, Duration}
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 
-class DagManagerSpec extends WordSpecLike with Matchers with MockitoSugar with 
BeforeAndAfterAll {
+class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll 
{
 
   val hash = Partitioner[HashPartitioner]
   val task1 = ProcessorDescription(id = 1, taskClass = 
classOf[TaskActor].getName, parallelism = 1)

Reply via email to