Repository: incubator-gearpump Updated Branches: refs/heads/master baa7dead9 -> 8939a2d42
fix GEARPUMP-155, integration test failure Author: manuzhang <[email protected]> Closes #40 from manuzhang/it_failure. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/8939a2d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/8939a2d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/8939a2d4 Branch: refs/heads/master Commit: 8939a2d42138065be9c760fb94807bf77f0198a2 Parents: baa7dea Author: manuzhang <[email protected]> Authored: Thu Jun 16 09:21:10 2016 +0800 Committer: manuzhang <[email protected]> Committed: Thu Jun 16 09:21:10 2016 +0800 ---------------------------------------------------------------------- .../gearpump/integrationtest/storm/StormClient.scala | 4 ++-- .../apache/gearpump/streaming/appmaster/DagManager.scala | 6 +++--- .../gearpump/streaming/appmaster/DagManagerSpec.scala | 11 ++++------- 3 files changed, 9 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8939a2d4/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala index 79adfc4..5a58782 100644 --- a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala @@ -68,9 +68,9 @@ class StormClient(cluster: MiniCluster, restClient: RestClient) { } def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = { + Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + + s"-jar $jar $mainClass $args") Util.retryUntil(() => { - Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + - s"-jar $jar $mainClass $args") restClient.listRunningApps().exists(_.appName == appName) }, "app running") restClient.listRunningApps().filter(_.appName == appName).head.appId http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8939a2d4/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala index d6703e4..2736f5e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.appmaster import akka.actor.{ExtendedActorSystem, Actor, ActorRef, Stash} +import akka.serialization.JavaSerializer import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.partitioner.PartitionerDescription -import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.DagManager._ import org.apache.gearpump.streaming.storage.AppDataStore @@ -49,13 +49,13 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O private implicit val system = context.system private var watchers = List.empty[ActorRef] - private val serializer = new KryoSerializerWrapper(system.asInstanceOf[ExtendedActorSystem]) + private val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) override def receive: Receive = null override def preStart(): Unit = { LOG.info("Initializing Dag Service, get stored Dag ....") - store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].map { bytes => + store.get(StreamApplication.DAG).asInstanceOf[Future[Array[Byte]]].foreach{ bytes => if (bytes != null) { val storedDag = serializer.fromBinary(bytes).asInstanceOf[DAG] dags :+= storedDag http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/8939a2d4/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala index a8d1ff5..be3b3b7 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -18,26 +18,23 @@ package org.apache.gearpump.streaming.appmaster -import java.util.concurrent.TimeUnit -import akka.actor.{ExtendedActorSystem, ActorSystem, Props} +import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} -import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange} import org.apache.gearpump.streaming.task.{Subscriber, TaskActor} import org.apache.gearpump.streaming._ import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ -import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -import scala.concurrent.{Promise, Future, Await} -import scala.concurrent.duration.{FiniteDuration, Duration} +import scala.concurrent.Await +import scala.concurrent.duration.Duration -class DagManagerSpec extends WordSpecLike with Matchers with MockitoSugar with BeforeAndAfterAll { +class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { val hash = Partitioner[HashPartitioner] val task1 = ProcessorDescription(id = 1, taskClass = classOf[TaskActor].getName, parallelism = 1)
