Repository: incubator-gearpump Updated Branches: refs/heads/master 2334c19ee -> 6bef69cd7
[GEARPUMP-326] Upgrade Akka to 2.5.13 1. Upgrade Akka 2. Remove akkastream module temporarily Author: manuzhang <[email protected]> Closes #250 from manuzhang/upgrade_akka_2.5.13. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/6bef69cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/6bef69cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/6bef69cd Branch: refs/heads/master Commit: 6bef69cd71febb5f06327f528704850f0134687e Parents: 2334c19 Author: manuzhang <[email protected]> Authored: Mon Jun 25 21:45:39 2018 +0800 Committer: manuzhang <[email protected]> Committed: Mon Jun 25 21:46:24 2018 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/main/AppSubmitter.scala | 1 - .../org/apache/gearpump/cluster/main/Gear.scala | 5 +--- .../cluster/master/InMemoryKVService.scala | 28 +++++++++++--------- .../apache/gearpump/jarstore/FileServer.scala | 9 ++++--- project/BuildExperiments.scala | 2 +- project/Dependencies.scala | 7 +++-- project/Pack.scala | 12 ++++++--- 7 files changed, 34 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala index defd86e..4a43d2d 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala @@ -21,7 +21,6 @@ import java.io.File import java.net.{URL, URLClassLoader} import java.util.jar.JarFile -import org.apache.gearpump.cluster.client.RuntimeEnvironment import org.apache.gearpump.util.{Constants, LogUtil, MasterClientCommand, Util} import scala.util.{Failure, Success, Try} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala index 7d6181f..a835350 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala @@ -17,10 +17,7 @@ */ package org.apache.gearpump.cluster.main -import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment} -import org.apache.gearpump.util.LogUtil.ProcessType -import org.apache.gearpump.util.{Constants, LogUtil} +import org.apache.gearpump.util.Constants object Gear { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala index fd19bad..9bcce6f 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala @@ -40,15 +40,15 @@ class InMemoryKVService extends Actor with Stash { private val LOG: Logger = LogUtil.getLogger(getClass) private val replicator = DistributedData(context.system).replicator - private implicit val cluster = Cluster(context.system) + private implicit val cluster: Cluster = Cluster(context.system) // Optimize write path, we can tolerate one master down for recovery. private val timeout = Duration(15, TimeUnit.SECONDS) private val readMajority = ReadMajority(timeout) private val writeMajority = WriteMajority(timeout) - private def groupKey(group: String): LWWMapKey[Any] = { - LWWMapKey[Any](KV_SERVICE + "_" + group) + private def groupKey(group: String): LWWMapKey[Any, Any] = { + LWWMapKey[Any, Any](KV_SERVICE + "_" + group) } def receive: Receive = kvService @@ -58,14 +58,15 @@ class InMemoryKVService extends Actor with Stash { case GetKV(group: String, key: String) => val request = Request(sender(), key) replicator ! Get(groupKey(group), readMajority, Some(request)) - case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + case success@GetSuccess(group: LWWMapKey[Any @unchecked, Any @unchecked], + Some(request: Request)) => val appData = success.get(group) LOG.info(s"Successfully retrived group: ${group.id}") request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull) - case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + case NotFound(group: LWWMapKey[Any @unchecked, Any @unchecked], Some(request: Request)) => LOG.info(s"We cannot find group $group") request.client ! GetKVSuccess(request.key, null) - case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + case GetFailure(_, Some(request: Request)) => val error = s"Failed to get application data, the request key is ${request.key}" LOG.error(error) request.client ! GetKVFailed(new Exception(error)) @@ -76,20 +77,21 @@ class InMemoryKVService extends Actor with Stash { map + (key -> value) } replicator ! update - case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + case UpdateSuccess(_, Some(request: Request)) => request.client ! PutKVSuccess - case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) => + case ModifyFailure(_, error, cause, + Some(request: Request)) => request.client ! PutKVFailed(request.key, new Exception(error, cause)) - case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + case UpdateTimeout(_, Some(request: Request)) => request.client ! PutKVFailed(request.key, new TimeoutException()) - case delete@DeleteKVGroup(group: String) => + case DeleteKVGroup(group: String) => replicator ! Delete(groupKey(group), writeMajority) - case DeleteSuccess(group) => + case DeleteSuccess(group, _) => LOG.info(s"KV Group ${group.id} is deleted") - case ReplicationDeleteFailure(group) => + case ReplicationDeleteFailure(group, _) => LOG.error(s"Failed to delete KV Group ${group.id}...") - case DataDeleted(group) => + case DataDeleted(group, _) => LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala index 8c1d19a..4bcc3a5 100644 --- a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala +++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala @@ -18,8 +18,10 @@ package org.apache.gearpump.jarstore import java.io.File -import scala.concurrent.{ExecutionContext, Future} +import akka.Done + +import scala.concurrent.{ExecutionContext, Future} import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.Http.ServerBinding @@ -33,7 +35,6 @@ import akka.stream.ActorMaterializer import akka.stream.scaladsl.{FileIO, Sink, Source} import spray.json.DefaultJsonProtocol._ import spray.json.JsonFormat - import org.apache.gearpump.jarstore.FileDirective._ import org.apache.gearpump.jarstore.FileServer.Port @@ -84,14 +85,14 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, jarStore: Jar } } - private var connection: Future[ServerBinding] = null + private var connection: Future[ServerBinding] = _ def start: Future[Port] = { connection = Http().bindAndHandle(Route.handlerFlow(route), host, port) connection.map(address => Port(address.localAddress.getPort)) } - def stop: Future[Unit] = { + def stop: Future[Done] = { connection.flatMap(_.unbind()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/BuildExperiments.scala ---------------------------------------------------------------------- diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala index 84c80f0..765bedd 100644 --- a/project/BuildExperiments.scala +++ b/project/BuildExperiments.scala @@ -25,7 +25,7 @@ import sbt.Keys._ object BuildExperiments extends sbt.Build { lazy val experiments: Seq[ProjectReference] = Seq( - akkastream, + // akkastream, cgroup, redis, storm, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 06d2781..627138d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,8 +23,8 @@ object Dependencies { val crossScalaVersionNumbers = Seq("2.11.8") val scalaVersionNumber = crossScalaVersionNumbers.last - val akkaVersion = "2.4.16" - val akkaHttpVersion = "10.0.1" + val akkaVersion = "2.5.13" + val akkaHttpVersion = "10.1.3" val hadoopVersion = "2.6.0" val hbaseVersion = "1.0.0" val commonsHttpVersion = "3.1" @@ -89,11 +89,10 @@ object Dependencies { "com.typesafe.akka" %% "akka-cluster" % akkaVersion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, "commons-logging" % "commons-logging" % commonsLoggingVersion, - "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-distributed-data" % akkaVersion, "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-agent" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, - "com.typesafe.akka" %% "akka-kernel" % akkaVersion, "com.typesafe.akka" %% "akka-http" % akkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion, "org.scala-lang" % "scala-reflect" % scalaVersionNumber, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index 5e546b2..568d7f3 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -128,8 +128,8 @@ object Pack extends sbt.Build { "lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id). exclude(services.id, core.id), "lib/services" -> new ProjectsToPack(services.id).exclude(core.id), - "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id), - "lib/akkastream" -> new ProjectsToPack(akkastream.id) + // "lib/akkastream" -> new ProjectsToPack(akkastream.id), + "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id) ), packExclude := Seq(thisProjectRef.value.project), @@ -164,6 +164,12 @@ object Pack extends sbt.Build { packArchiveExcludes := Seq("integrationtest") ) - ).dependsOn(core, streaming, services, yarn, storm, akkastream, cgroup). + ).dependsOn(core, + streaming, + services, + yarn, + storm, + // akkastream, + cgroup). disablePlugins(sbtassembly.AssemblyPlugin) }
