fix #1988, upgrade akka to akka 2.4.2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/21d59216 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/21d59216 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/21d59216 Branch: refs/heads/master Commit: 21d59216b181186e322738cef751e3bc7abc6a81 Parents: bcdbfc9 Author: Sean Zhong <[email protected]> Authored: Fri Mar 25 00:33:23 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:23:48 2016 +0800 ---------------------------------------------------------------------- .travis.yml | 4 +- conf/gear.conf | 2 +- .../main/java/io/gearpump/util/AkkaHelper.java | 35 +++++++ .../io/gearpump/util/HadoopFSLogAppender.java | 18 ++++ core/src/main/resources/geardefault.conf | 9 ++ .../io/gearpump/transport/netty/Server.scala | 6 +- .../main/scala/io/gearpump/util/ActorUtil.scala | 1 + .../io/gearpump/cluster/DaemonMessage.scala | 2 - .../scala/io/gearpump/cluster/main/Master.scala | 35 ++++--- .../io/gearpump/cluster/master/AppManager.scala | 2 +- .../cluster/master/ClusterReplication.scala | 64 ----------- .../cluster/master/InMemoryKVService.scala | 105 +++++++++++-------- .../io/gearpump/cluster/master/Master.scala | 6 +- .../scala/io/gearpump/util/FileDirective.scala | 12 ++- .../scala/io/gearpump/util/FileServer.scala | 15 ++- .../cluster/main/MasterWatcherSpec.scala | 3 +- .../cluster/master/InMemoryKVServiceSpec.scala | 7 +- .../yarn/appmaster/YarnAppMaster.scala | 4 +- .../yarn/client/AppMasterResolver.scala | 4 +- .../checklist/MessageDeliverySpec.scala | 2 +- project/Build.scala | 65 +++++++----- project/Pack.scala | 4 +- project/plugins.sbt | 6 +- .../gearpump/services/MasterServiceSpec.scala | 6 +- .../gearpump/services/SecurityServiceSpec.scala | 16 +-- ...CloudFoundryUAAOAuth2AuthenticatorSpec.scala | 14 +-- .../oauth2/GoogleOAuth2AuthenticatorSpec.scala | 14 +-- .../streaming/task/ExpressTransport.scala | 4 +- .../streaming/appmaster/DagManagerSpec.scala | 2 +- .../gearpump/streaming/dsl/StreamAppSpec.scala | 1 + .../2.10/akka-actor_2.10-2.3.12-fix-1816.jar | Bin 2815968 -> 0 bytes .../2.11/akka-actor_2.11-2.3.12-fix-1816.jar | Bin 2790862 -> 0 bytes 32 files changed, 248 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 771f652..5b6b80f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,9 +19,9 @@ script: then sbt -jvm-opts project/travis/jvmopts clean +assembly +packArchiveZip | grep -v -E "$skipLogs"; fi jdk: -- oraclejdk7 +- oraclejdk8 scala: -- 2.11.5 +- 2.11.8 cache: directories: - $HOME/.m2 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/conf/gear.conf ---------------------------------------------------------------------- diff --git a/conf/gear.conf b/conf/gear.conf index 1ab4bb6..d8e8b4c 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -320,7 +320,7 @@ gearpump { ### Configuration only visible to master nodes.. gearpump-master { extensions = [ - "akka.contrib.datareplication.DataReplication$" + "akka.cluster.ddata.DistributedData$" ] akka { ######################################### http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/java/io/gearpump/util/AkkaHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/io/gearpump/util/AkkaHelper.java b/core/src/main/java/io/gearpump/util/AkkaHelper.java new file mode 100644 index 0000000..f4772c1 --- /dev/null +++ b/core/src/main/java/io/gearpump/util/AkkaHelper.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.util; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; + +public class AkkaHelper { + + /** + * Helper util to access the private[akka] system.actorFor method + * + * This is used for performance optimization, we encode the session Id + * in the ActorRef path. Session Id is used to identity sender Task. + */ + public static ActorRef actorFor(ActorSystem system, String path) { + return system.actorFor(path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java index a736f9c..ab7ee5e 100644 --- a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java +++ b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.util; import org.apache.log4j.RollingFileAppender; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index 17626a3..394a928 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -96,6 +96,7 @@ gearpump { ### If you want to use metrics, please change ########################### + ### Flag to enable metrics metrics { enabled = false @@ -572,6 +573,14 @@ akka { actor { provider = "akka.remote.RemoteActorRefProvider" + ## Doesn't warn on Java serializer usage + ## + ## Most of our streaming message are using custom serializer, with a few + ## exception on system control message. The volume of system control + ## message should be small. So, turn this flag off until further benchmark + ## shows a different result. + warn-about-java-serializer-usage = false + ## TODO: in integration test, may need to enable this ##creation-timeout=100s default-mailbox { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/scala/io/gearpump/transport/netty/Server.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala index c592b31..dde0861 100644 --- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala +++ b/core/src/main/scala/io/gearpump/transport/netty/Server.scala @@ -22,7 +22,7 @@ import java.util import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem} import io.gearpump.transport.ActorLookupById -import io.gearpump.util.LogUtil +import io.gearpump.util.{LogUtil, AkkaHelper} import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup} import org.slf4j.Logger @@ -116,7 +116,9 @@ object Server { def translateToActorRef(sessionId : Int): ActorRef = { if(!taskIdtoActorRef.contains(sessionId)){ - val actorRef = context.system.actorFor(s"/session#$sessionId") + + // A fake ActorRef for performance optimization. + val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId") taskIdtoActorRef += sessionId -> actorRef } taskIdtoActorRef.get(sessionId).get http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/scala/io/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala index b63733d..8233b28 100644 --- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala @@ -87,6 +87,7 @@ object ActorUtil { def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig, sender: ActorRef)(implicit executor : scala.concurrent.ExecutionContext) = { implicit val timeout = Constants.FUTURE_TIMEOUT + (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list => val resources = list.workers.map { workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala index 36d84c0..19ac620 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala @@ -20,8 +20,6 @@ package io.gearpump.cluster import akka.actor.ActorRef import io.gearpump.cluster.master.Master.MasterInfo import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.master.Master.MasterInfo -import io.gearpump.cluster.scheduler.Resource /** * Cluster Bootup Flow http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala index c89ab1e..8d4515a 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit import akka.actor._ import akka.cluster.ClusterEvent._ +import akka.cluster.ddata.DistributedData import akka.cluster.{Cluster, Member, MemberStatus} -import akka.contrib.datareplication.DataReplication -import akka.contrib.pattern.{ClusterSingletonManager, ClusterSingletonProxy} +import akka.cluster.singleton.{ClusterSingletonManagerSettings, ClusterSingletonProxySettings, ClusterSingletonManager, ClusterSingletonProxy} import com.typesafe.config.ConfigValueFactory import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.master.{Master => MasterActor} @@ -85,26 +85,28 @@ object Master extends AkkaApp with ArgumentsParser { LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}") val system = ActorSystem(MASTER, masterConfig) - val replicator = DataReplication(system).replicator + val replicator = DistributedData(system).replicator LOG.info(s"Replicator path: ${replicator.path}") - //start master proxy - val masterProxy = system.actorOf(ClusterSingletonProxy.props( - singletonPath = s"/user/${SINGLETON_MANAGER}/${MASTER_WATCHER}/${MASTER}", - role = Some(MASTER)), - name = MASTER) - //start singleton manager val singletonManager = system.actorOf(ClusterSingletonManager.props( - singletonProps = Props(classOf[MasterWatcher], MASTER, masterProxy), - singletonName = MASTER_WATCHER, + singletonProps = Props(classOf[MasterWatcher], MASTER), terminationMessage = PoisonPill, - role = Some(MASTER)), + settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER).withRole(MASTER)), name = SINGLETON_MANAGER) + //start master proxy + val masterProxy = system.actorOf(ClusterSingletonProxy.props( + singletonManagerPath = s"/user/${SINGLETON_MANAGER}", + // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}". + // Master will only be created when there is a majority of machines started. + settings = ClusterSingletonProxySettings(system).withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), + name = MASTER + ) + LOG.info(s"master proxy is started at ${masterProxy.path}") - val mainThread = Thread.currentThread(); + val mainThread = Thread.currentThread() Runtime.getRuntime().addShutdownHook(new Thread() { override def run() : Unit = { if (!system.isTerminated) { @@ -120,16 +122,16 @@ object Master extends AkkaApp with ArgumentsParser { case ex : Exception => //ignore } system.shutdown() - mainThread.join(); + mainThread.join() } } - }); + }) system.awaitTermination() } } -class MasterWatcher(role: String, masterProxy : ActorRef) extends Actor with ActorLogging { +class MasterWatcher(role: String) extends Actor with ActorLogging { import context.dispatcher val cluster = Cluster(context.system) @@ -192,7 +194,6 @@ class MasterWatcher(role: String, masterProxy : ActorRef) extends Actor with Ac def waitForShutdown : Receive = { case MasterWatcher.Shutdown => { - context.system.stop(masterProxy) cluster.unsubscribe(self) cluster.leave(cluster.selfAddress) context.stop(self) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala index 06ab8bb..e6bd1db 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala @@ -241,7 +241,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map { case PutKVSuccess => client ! AppDataSaved - case PutKVFailed(k, v, ex) => + case PutKVFailed(k, ex) => client ! SaveAppDataFailed } case GetAppData(appId, key) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala b/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala deleted file mode 100644 index faeccb3..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import java.util.concurrent.TimeUnit - -import akka.actor._ -import akka.cluster.Cluster -import akka.contrib.datareplication.Replicator._ -import akka.contrib.datareplication.{DataReplication, GSet} -import io.gearpump.util.Constants -import io.gearpump.util.Constants._ -import io.gearpump.util._ -import org.slf4j.Logger - -import scala.concurrent.duration.Duration - -/** - * ClusterReplication use [[DataReplication]] to store replicated state. - */ -trait ClusterReplication extends Actor with Stash { - - val LOG: Logger = LogUtil.getLogger(getClass) - val systemconfig = context.system.settings.config - - implicit val executionContext = context.dispatcher - implicit val cluster = Cluster(context.system) - - val TIMEOUT = Duration(5, TimeUnit.SECONDS) - val STATE = "masterstate" - val KVService = "kvService" - implicit val timeout = Constants.FUTURE_TIMEOUT - - val replicator = DataReplication(context.system).replicator - - val masterClusterSize = Math.max(1, systemconfig.getStringList(GEARPUMP_CLUSTER_MASTERS).size()) - - //optimize write path, we can tolerate one master down for recovery. - val writeQuorum = Math.min(2, masterClusterSize / 2 + 1) - val readQuorum = masterClusterSize + 1 - writeQuorum - - def stateChangeListener : Receive = { - case update: UpdateResponse => - LOG.debug(s"we get update $update") - case Changed(STATE, data: GSet) => - LOG.info("master state updated ") - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala index e31ed89..fb66a0c 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala @@ -18,65 +18,78 @@ package io.gearpump.cluster.master -import akka.actor._ -import akka.contrib.datareplication.LWWMap -import akka.contrib.datareplication.Replicator._ -import akka.pattern.ask +import java.util.concurrent.TimeUnit -import scala.concurrent.Future +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.ddata.{DistributedData, LWWMap, Key, LWWMapKey} +import akka.cluster.ddata.Replicator._ +import io.gearpump.util.{LogUtil} +import org.slf4j.Logger +import scala.concurrent.TimeoutException +import scala.concurrent.duration.Duration /** * A replicated simple in-memory KV service. */ -class InMemoryKVService extends Actor with Stash with ClusterReplication { +class InMemoryKVService extends Actor with Stash { import InMemoryKVService._ - def receive : Receive = kvService orElse stateChangeListener + private val KV_SERVICE = "gearpump_kvservice" - override def preStart(): Unit = { - replicator ! Subscribe(STATE, self) - } + private val LOG: Logger = LogUtil.getLogger(getClass) + private val replicator = DistributedData(context.system).replicator + private implicit val cluster = Cluster(context.system) + + //optimize write path, we can tolerate one master down for recovery. + private val timeout = Duration(15, TimeUnit.SECONDS) + private val readMajority = ReadMajority(timeout) + private val writeMajority = WriteMajority(timeout) - override def postStop(): Unit = { - replicator ! Unsubscribe(STATE, self) + private def groupKey(group: String): LWWMapKey[Any] = { + LWWMapKey[Any](KV_SERVICE + "_" + group) } + + def receive : Receive = kvService + def kvService : Receive = { + case GetKV(group: String, key : String) => - val client = sender - (replicator ? new Get(KVService + group, ReadFrom(readQuorum), TIMEOUT, None)).asInstanceOf[Future[GetResponse]].map { - case GetSuccess(_, appData: LWWMap, _) => - LOG.info(s"Successfully retrived group: $group") - client ! GetKVSuccess(key, appData.get(key).orNull) - case x: NotFound => - LOG.info(s"We cannot find group $group") - client ! GetKVSuccess(key, null) - case x : GetFailure => - LOG.error(s"Failed to get application $key data, the request key is $key") - client ! GetKVFailed(new Exception(x.getClass.getName)) - case GetSuccess(_, x, _) => - LOG.error(s"Got unexpected response when get key $key, the response is $x") - client ! GetKVFailed(new Exception(x.getClass.getName)) - } + val request = Request(sender(), key) + replicator ! Get(groupKey(group), readMajority, Some(request)) + case success@ GetSuccess(group: LWWMapKey[Any], Some(request: Request)) => + val appData = success.get(group) + LOG.info(s"Successfully retrived group: ${group.id}") + request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull) + case NotFound(group: LWWMapKey[Any], Some(request: Request)) => + LOG.info(s"We cannot find group $group") + request.client ! GetKVSuccess(request.key, null) + case GetFailure(group: LWWMapKey[Any], Some(request: Request)) => + val error = s"Failed to get application data, the request key is ${request.key}" + LOG.error(error) + request.client ! GetKVFailed(new Exception(error)) case PutKV(group: String, key: String, value: Any) => - val client = sender - - val update = Update(KVService + group, LWWMap(), - WriteTo(writeQuorum), TIMEOUT) {map => + val request = Request(sender(), key) + val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) {map => map + (key -> value) } - - val putFuture = (replicator ? update).asInstanceOf[Future[UpdateResponse]] - - putFuture.map { - case UpdateSuccess(key, _) => - client ! PutKVSuccess - case fail: UpdateFailure => - client ! PutKVFailed(key, value, new Exception(fail.getClass.getName)) - } - case DeleteKVGroup(group: String) => - val client = sender - replicator ? Update(KVService + group, LWWMap(), WriteTo(writeQuorum), TIMEOUT)( _ => LWWMap()) + replicator ! update + case UpdateSuccess(group: LWWMapKey[Any], Some(request: Request)) => + request.client ! PutKVSuccess + case ModifyFailure(group: LWWMapKey[Any], error, cause, Some(request: Request)) => + request.client ! PutKVFailed(request.key, new Exception(error, cause)) + case UpdateTimeout(group: LWWMapKey[Any], Some(request: Request)) => + request.client ! PutKVFailed(request.key, new TimeoutException()) + + case delete@ DeleteKVGroup(group: String) => + replicator ! Delete(groupKey(group), writeMajority) + case DeleteSuccess(group) => + LOG.info(s"KV Group ${group.id} is deleted") + case ReplicationDeleteFailure(group) => + LOG.error(s"Failed to delete KV Group ${group.id}...") + case DataDeleted(group) => + LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...") } } @@ -96,9 +109,13 @@ object InMemoryKVService { case class DeleteKVGroup(group: String) + case class GroupDeleted(group: String) extends GetKVResult with PutKVResult + trait PutKVResult case object PutKVSuccess extends PutKVResult - case class PutKVFailed(key: String, value: Any, ex: Throwable) extends PutKVResult + case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult + + case class Request(client: ActorRef, key: String) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala index a4d9b54..f22a300 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala @@ -156,8 +156,8 @@ private[cluster] class Master extends Actor with Stash { def kvServiceMsgHandler: Receive = { case PutKVSuccess => //Skip - case PutKVFailed(key, value, exception) => - LOG.error(s"Put value $value with key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception)) + case PutKVFailed(key, exception) => + LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception)) } def metricsService : Receive = { @@ -286,7 +286,7 @@ private[cluster] class Master extends Actor with Stash { } object Master { - final val MASTER_GROUP = "-1" + final val MASTER_GROUP = "master_group" final val WORKER_ID = "next_worker_id" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/util/FileDirective.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala index b4ff5b3..f4f82fb 100644 --- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala +++ b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala @@ -25,7 +25,7 @@ import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.stream.Materializer -import akka.stream.io.{SynchronousFileSink, SynchronousFileSource} +import akka.stream.scaladsl.FileIO import scala.concurrent.{ExecutionContext, Future} @@ -75,6 +75,8 @@ object FileDirective { ctx => { filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx)) } + + } } } @@ -88,7 +90,7 @@ object FileDirective { val responseEntity = HttpEntity( MediaTypes.`application/octet-stream`, file.length, - SynchronousFileSource(file, CHUNK_SIZE)) + FileIO.fromFile(file, CHUNK_SIZE)) complete(responseEntity) } @@ -100,10 +102,10 @@ object FileDirective { //reserve the suffix val targetPath = File.createTempFile(s"userfile_${p.name}_", s"${p.filename.getOrElse("")}", rootDirectory) - val written = p.entity.dataBytes.runWith(SynchronousFileSink(targetPath)) + val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath)) written.map(written => - if (written > 0) { - Map(p.name -> FileInfo(p.filename.get, targetPath, written)) + if (written.count > 0) { + Map(p.name -> FileInfo(p.filename.get, targetPath, written.count)) } else { Map.empty[Name, FileInfo] }) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/util/FileServer.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala index f208465..361c01d 100644 --- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala +++ b/daemon/src/main/scala/io/gearpump/util/FileServer.scala @@ -24,15 +24,14 @@ import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.Uri.{Query, Path} import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer -import akka.stream.io.{SynchronousFileSink, SynchronousFileSource} import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet -import akka.stream.scaladsl.{Sink, Source} +import akka.stream.scaladsl.{Sink, Source, FileIO} import io.gearpump.jarstore.FilePath import io.gearpump.util.FileDirective._ import io.gearpump.util.FileServer.Port @@ -65,7 +64,7 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory pathEndOrSingleSlash { extractUri { uri => val upload = uri.withPath(Uri.Path("/upload")).toString() - val entity = HttpEntity(MediaTypes.`text/html`, + val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`, s""" | |<h2>Please specify a file to upload:</h2> @@ -123,7 +122,7 @@ object FileServer { HttpRequest(HttpMethods.POST, uri = target, entity = entity) } - val response = Source(request).via(httpClient).runWith(Sink.head) + val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head) response.flatMap{some => Unmarshal(some).to[String] }.map{path => @@ -132,17 +131,17 @@ object FileServer { } def download(remoteFile: FilePath, saveAs: File): Future[Unit] = { - val downoad = server.withPath(Path("/download")).withQuery("file" -> remoteFile.path) + val downoad = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path)) //download file to local val response = Source.single(HttpRequest(uri = downoad)).via(httpClient).runWith(Sink.head) val downloaded = response.flatMap { response => - response.entity.dataBytes.runWith(SynchronousFileSink(saveAs)) + response.entity.dataBytes.runWith(FileIO.toFile(saveAs)) } downloaded.map(written => Unit) } private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { - val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000)) + val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromFile(file, chunkSize = 100000)) val body = Source.single( Multipart.FormData.BodyPart( "uploadfile", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala index 863d70c..3927993 100644 --- a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala +++ b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala @@ -34,9 +34,8 @@ class MasterWatcherSpec extends FlatSpec with Matchers { val system = ActorSystem("ForMasterWatcher", config) val actorWatcher = TestProbe()(system) - val mockMaster = TestProbe()(system) - val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher", mockMaster.ref)) + val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) actorWatcher watch masterWatcher actorWatcher.expectTerminated(masterWatcher, 5 seconds) system.shutdown() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala index 76bfd54..8f60d34 100644 --- a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala +++ b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala @@ -23,6 +23,7 @@ import akka.testkit.TestProbe import io.gearpump.cluster.master.InMemoryKVService._ import io.gearpump.cluster.{MasterHarness, TestUtil} import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +import scala.concurrent.duration._ class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { @@ -54,7 +55,11 @@ class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEa client.send(kvService, DeleteKVGroup(group)) + // after DeleteGroup, it no longer accept Get and Put client.send(kvService, GetKV(group, "key")) - client.expectMsg(GetKVSuccess("key", null)) + client.expectNoMsg(3 seconds) + + client.send(kvService, PutKV(group, "key", 3)) + client.expectNoMsg(3 seconds) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala index 6cc0e33..f8982ce 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala @@ -31,7 +31,7 @@ import io.gearpump.experiments.yarn.Constants._ import io.gearpump.experiments.yarn.glue.Records._ import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig} import io.gearpump.transport.HostPort -import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} +import io.gearpump.util._ import org.apache.commons.httpclient.HttpClient import org.apache.commons.httpclient.methods.GetMethod import org.slf4j.Logger @@ -359,7 +359,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser { } if (status == 200) { - system.actorFor(get.getResponseBodyAsString) + AkkaHelper.actorFor(system, get.getResponseBodyAsString) } else { throw new IOException("Fail to resolve AppMaster address, please make sure " + s"${report.getOriginalTrackingUrl} is accessible...") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala index adf1716..9f30570 100644 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala +++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala @@ -23,7 +23,7 @@ import java.io.IOException import akka.actor.{ActorRef, ActorSystem} import io.gearpump.experiments.yarn.glue.Records.ApplicationId import io.gearpump.experiments.yarn.glue.YarnClient -import io.gearpump.util.LogUtil +import io.gearpump.util.{AkkaHelper, LogUtil} import org.apache.commons.httpclient.HttpClient import org.apache.commons.httpclient.methods.GetMethod import org.slf4j.Logger @@ -52,7 +52,7 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { if (status == 200) { val response = get.getResponseBodyAsString LOG.info("Successfully resolved AppMaster address: " + response) - system.actorFor(response) + AkkaHelper.actorFor(system, response) } else { throw new IOException("Fail to resolve AppMaster address, please make sure " + s"${report.getOriginalTrackingUrl} is accessible...") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala index 3f029ef..a9fdee5 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala @@ -44,7 +44,7 @@ class MessageDeliverySpec extends TestSpecBase { val appId = restClient.getNextAvailableAppId() val stateJar = cluster.queryBuiltInExampleJars("state-").head - val success = restClient.submitApp(stateJar, args) + val success = restClient.submitApp(stateJar, executorNum = 1, args = args) success shouldBe true // verify #1 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 0d08718..761c3b9 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -19,7 +19,7 @@ object Build extends sbt.Build { val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code") - val akkaVersion = "2.3.12" + val akkaVersion = "2.4.2" val kryoVersion = "0.3.2" val clouderaVersion = "2.6.0-cdh5.4.2" val clouderaHBaseVersion = "1.0.0-cdh5.4.2" @@ -37,7 +37,7 @@ object Build extends sbt.Build { val slf4jVersion = "1.7.7" val gsCollectionsVersion = "6.2.0" - val crossScalaVersionNumbers = Seq("2.10.5", "2.11.5") + val crossScalaVersionNumbers = Seq("2.11.8") val scalaVersionNumber = crossScalaVersionNumbers.last val sprayVersion = "1.3.2" val sprayJsonVersion = "1.3.1" @@ -48,7 +48,6 @@ object Build extends sbt.Build { val scalazVersion = "7.1.1" val algebirdVersion = "0.9.0" val chillVersion = "0.6.0" - val distDirectory = "output" val projectName = "gearpump" @@ -56,7 +55,7 @@ object Build extends sbt.Build { ++ Pack.projects.toList).toSeq - val commonSettings = Seq(jacoco.settings:_*) ++ sonatypeSettings ++ net.virtualvoid.sbt.graph.Plugin.graphSettings ++ + val commonSettings = Seq(jacoco.settings:_*) ++ sonatypeSettings ++ Seq( resolvers ++= Seq( "patriknw at bintray" at "http://dl.bintray.com/patriknw/maven", @@ -69,8 +68,8 @@ object Build extends sbt.Build { "clockfly" at "http://dl.bintray.com/clockfly/maven", "vincent" at "http://dl.bintray.com/fvunicorn/maven", "clojars" at "http://clojars.org/repo" - ), - addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full) + ) + // ,addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full) ) ++ Seq( scalaVersion := scalaVersionNumber, @@ -133,23 +132,19 @@ object Build extends sbt.Build { val daemonDependencies = Seq( libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-contrib" % akkaVersion - exclude("com.typesafe.akka", "akka-persistence-experimental_2.11"), "com.typesafe.akka" %% "akka-cluster" % akkaVersion, - "com.typesafe.akka" %% "akka-http-experimental" % "1.0", - "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0", - "com.typesafe.akka" %% "akka-stream-experimental" % "1.0", - "com.typesafe.akka" %% "akka-http-spray-json-experimental"% "1.0", + "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, + "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-http-spray-json-experimental"% akkaVersion, "commons-logging" % "commons-logging" % commonsLoggingVersion, - "com.github.patriknw" %% "akka-data-replication" % dataReplicationVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, "org.apache.hadoop" % "hadoop-common" % clouderaVersion % "provided" ) ) val streamingDependencies = Seq( libraryDependencies ++= Seq( - "com.github.intel-hadoop" % "gearpump-shaded-gs-collections" % gsCollectionsVersion, - "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" + "com.github.intel-hadoop" % "gearpump-shaded-gs-collections" % gsCollectionsVersion ) ) @@ -160,9 +155,24 @@ object Build extends sbt.Build { "org.slf4j" % "slf4j-log4j12" % slf4jVersion, "com.github.intel-hadoop" % "gearpump-shaded-guava" % guavaVersion, "commons-lang" % "commons-lang" % commonsLangVersion, - "com.typesafe.akka" %% "akka-actor" % akkaVersion, - "com.typesafe.akka" %% "akka-remote" % akkaVersion, - "com.typesafe.akka" %% "akka-agent" % akkaVersion, + + /** + * Override Netty version 3.10.3.Final used by Akka 2.4.2 to work-around netty hang issue + * (https://github.com/gearpump/gearpump/issues/2020) + * + * Akka 2.4.2 by default use Netty 3.10.3.Final, which has a serious issue which can hang the + * network. The same issue also happens in version range (3.10.0.Final, 3.10.5.Final) + * Netty 3.10.6.Final have this issue fixed, however, we find there is a 20% performance drop. + * So we decided to downgrade netty to 3.8.0.Final (Same version used in akka 2.3.12). + * + * @see https://github.com/gearpump/gearpump/pull/2017 for more discussions. + */ + "io.netty" % "netty" % "3.8.0.Final", + "com.typesafe.akka" %% "akka-remote" % akkaVersion + exclude("io.netty", "netty"), + + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-agent" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-kernel" % akkaVersion, "com.github.intel-hadoop" %% "gearpump-shaded-akka-kryo" % kryoVersion, @@ -172,16 +182,12 @@ object Build extends sbt.Build { "org.mockito" % "mockito-core" % mockitoVersion % "test", "junit" % "junit" % junitVersion % "test" ), - libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _), - libraryDependencies ++= ( - if (scalaVersion.value.startsWith("2.10")) List("org.scalamacros" %% "quasiquotes" % "2.1.0-M5") - else List("org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.3") - ) + libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _) ) lazy val javadocSettings = Seq( - addCompilerPlugin("org.spark-project" %% "genjavadoc-plugin" % - "0.9-spark0" cross CrossVersion.full), + addCompilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" % + "0.9" cross CrossVersion.full), scalacOptions += s"-P:genjavadoc:out=${target.value}/java" ) @@ -217,7 +223,7 @@ object Build extends sbt.Build { base = file("."), settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting ).aggregate(core, daemon, streaming, services, external_kafka, external_monoid, external_serializer, - examples, storm, akkastream, yarn, external_hbase, packProject, external_hadoopfs, + examples, storm, yarn, external_hbase, packProject, external_hadoopfs, integration_test).settings(Defaults.itSettings : _*) lazy val core = Project( @@ -274,13 +280,14 @@ object Build extends sbt.Build { lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-http-testkit-experimental"% "1.0" % "test", + "com.typesafe.akka" %% "akka-http-testkit"% akkaVersion % "test", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "com.lihaoyi" %% "upickle" % upickleVersion, "com.softwaremill" %% "akka-http-session" % "0.1.4", - "com.typesafe.akka" %% "akka-http-spray-json-experimental"% "1.0", + "com.typesafe.akka" %% "akka-http-spray-json-experimental"% akkaVersion, "com.github.scribejava" % "scribejava-apis" % "2.4.0", - "com.ning" % "async-http-client" % "1.9.33", + "com.ning" % "async-http-client" % "1.9.33" + exclude("io.netty", "netty"), "org.webjars" % "angularjs" % "1.4.9", "org.webjars.npm" % "angular-touch" % "1.5.0", // angular 1.5 breaks ui-select, but we need ng-touch 1.5 "org.webjars" % "angular-ui-router" % "0.2.15", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index a3852c4..8a5ca8d 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -69,9 +69,7 @@ object Pack extends sbt.Build { "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id) ), packExclude := Seq(thisProjectRef.value.project), - //This is a work-around for https://github.com/gearpump/gearpump/issues/1816 - //Will be removed in the future when Akka release a new version which includes the fix. - packExcludeJars := Seq(s"akka-actor_${scalaBinaryVersion.value}-$akkaVersion.jar"), + packResourceDir += (baseDirectory.value / ".." / "conf" -> "conf"), packResourceDir += (baseDirectory.value / ".." / "yarnconf" -> "conf/yarnconf"), packResourceDir += (baseDirectory.value / ".." / "unmanagedlibs" / scalaBinaryVersion.value -> "lib"), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/plugins.sbt ---------------------------------------------------------------------- diff --git a/project/plugins.sbt b/project/plugins.sbt index 58b0aa6..0477712 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,7 +2,7 @@ resolvers += Resolver.url("fvunicorn", url("http://dl.bintray.com/fvunicorn/sbt- resolvers += Classpaths.sbtPluginReleases -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.4") +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.8") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") @@ -10,8 +10,6 @@ addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.6") addSbtPlugin("de.johoop" % "jacoco4sbt" % "2.1.6") -addSbtPlugin("com.gilt" % "sbt-dependency-graph-sugar" % "0.7.4") - addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") @@ -22,4 +20,4 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.1.0") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0") -addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3") \ No newline at end of file +addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala index ee70d6b..3a1c4fe 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala @@ -24,7 +24,6 @@ import akka.actor.ActorRef import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{`Cache-Control`, `Set-Cookie`} -import akka.stream.io.SynchronousFileSource import akka.stream.scaladsl.Source import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.TestProbe @@ -44,6 +43,7 @@ import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} import scala.concurrent.{Future, ExecutionContext} import scala.concurrent.duration._ import scala.util.{Success, Try} +import akka.stream.scaladsl.FileIO class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { @@ -168,7 +168,9 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with } private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { - val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000)) + val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), + FileIO.fromFile(file, chunkSize = 100000)) + val body = Source.single( Multipart.FormData.BodyPart( "file", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala index e3504ad..c276286 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala @@ -61,21 +61,21 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher assert(header[`Set-Cookie`].isDefined) val httpCookie = header[`Set-Cookie`].get.cookie assert(httpCookie.name == "gearpump_token") - cookie = new HttpCookiePair(httpCookie.name, httpCookie.value) + cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value) } // after authentication, everything is fine. - Get("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check { + Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { responseAs[String] shouldEqual "OK" } // however, guest cannot access high-permission operations, like POST. - Post("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check { + Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { assert(rejection == AuthorizationFailedRejection) } // logout, should clear the session - Post(s"/logout") ~> addHeader(Cookie(cookie)) ~> security.route ~> check{ + Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{ assert("{\"user\":\"guest\"}" == responseAs[String]) assert(status.intValue() == 200) assert(header[`Set-Cookie`].isDefined) @@ -106,21 +106,21 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher assert(header[`Set-Cookie`].isDefined) val httpCookie = header[`Set-Cookie`].get.cookie assert(httpCookie.name == "gearpump_token") - cookie = new HttpCookiePair(httpCookie.name, httpCookie.value) + cookie = HttpCookiePair(httpCookie.name, httpCookie.value) } // after authentication, everything is fine. - Get("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check { + Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { responseAs[String] shouldEqual "OK" } // Not like guest, admimn can also access POST - Post("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check { + Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { responseAs[String] shouldEqual "OK" } // logout, should clear the session - Post(s"/logout") ~> addHeader(Cookie(cookie)) ~> security.route ~> check{ + Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{ assert("{\"user\":\"admin\"}" == responseAs[String]) assert(status.intValue() == 200) assert(header[`Set-Cookie`].isDefined) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala index 3e3e0b0..df996e7 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala @@ -19,9 +19,9 @@ package io.gearpump.services.security.oauth2 import akka.actor.ActorSystem -import akka.http.javadsl.model.HttpEntityStrict +import akka.http.scaladsl.model.HttpEntity.Strict import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.Uri.{Query, Path} import akka.http.scaladsl.model._ import akka.http.scaladsl.testkit.ScalatestRouteTest import com.typesafe.config.ConfigFactory @@ -54,7 +54,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout uaa.init(configString) it should "generate the correct authorization request" in { - val parameters = Uri(uaa.getAuthorizationUrl()).query.toMap + val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap assert(parameters("response_type") == "code") assert(parameters("client_id") == configMap("clientid")) assert(parameters("redirect_uri") == configMap("callback")) @@ -69,10 +69,10 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout def accessTokenEndpoint(request: HttpRequest) = { assert(request.getHeader("Authorization").get.value() == "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=") - assert(request.entity.contentType().mediaType.value == "application/x-www-form-urlencoded") + assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") - val body = request.entity.asInstanceOf[HttpEntityStrict].data().decodeString("UTF-8") - val form = Uri./.withQuery(body).query.toMap + val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") + val form = Uri./.withQuery(Query(body)).query().toMap assert(form("grant_type") == "authorization_code") assert(form("code") == "QGGVeA") @@ -94,7 +94,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout } def protectedResourceEndpoint(request: HttpRequest) = { - assert(request.getUri().parameter("access_token").get == accessToken) + assert(request.getUri().query().get("access_token").get == accessToken) val response = s""" |{ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala index 8fbe43f..ff57baf 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala @@ -19,9 +19,9 @@ package io.gearpump.services.security.oauth2 import akka.actor.ActorSystem -import akka.http.javadsl.model.HttpEntityStrict +import akka.http.scaladsl.model.HttpEntity.Strict import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.Uri.{Query, Path} import akka.http.scaladsl.model._ import akka.http.scaladsl.testkit.ScalatestRouteTest import com.typesafe.config.ConfigFactory @@ -55,7 +55,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { google.init(configString) it should "generate the correct authorization request" in { - val parameters = Uri(google.getAuthorizationUrl()).query.toMap + val parameters = Uri(google.getAuthorizationUrl()).query().toMap assert(parameters("response_type") == "code") assert(parameters("client_id") == configMap("clientid")) assert(parameters("redirect_uri") == configMap("callback")) @@ -70,10 +70,10 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { def accessTokenEndpoint(request: HttpRequest) = { - assert(request.entity.contentType().mediaType.value == "application/x-www-form-urlencoded") + assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") - val body = request.entity.asInstanceOf[HttpEntityStrict].data().decodeString("UTF-8") - val form = Uri./.withQuery(body).query.toMap + val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") + val form = Uri./.withQuery(Query(body)).query().toMap assert(form("client_id") == configMap("clientid")) assert(form("client_secret") == configMap("clientsecret")) @@ -95,7 +95,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { } def protectedResourceEndpoint(request: HttpRequest) = { - assert(request.getUri().parameter("access_token").get == accessToken) + assert(request.getUri().query().get("access_token").get == accessToken) val response =s""" |{ | "kind": "plus#person", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala index 88e58a8..2dbdd14 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala @@ -25,7 +25,7 @@ import io.gearpump.transport.{Express, HostPort} import io.gearpump.Message import scala.collection.mutable - +import io.gearpump.util.AkkaHelper /** * ExpressTransport wire the networking function from default akka * networking to customized implementation [[Express]]. @@ -43,7 +43,7 @@ trait ExpressTransport { lazy val sourceId = TaskId.toLong(taskId) lazy val sessionRef: ActorRef = { - system.actorFor(s"/session#$sessionId") + AkkaHelper.actorFor(system, s"/session#$sessionId") } def transport(msg : AnyRef, remotes : TaskId *): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala index cc2f52b..2750cc5 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -38,7 +38,7 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { val dag = DAG(graph) implicit var system: ActorSystem = null val appId = 0 - val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph) + lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph) "DagManager" should { import io.gearpump.streaming.appmaster.ClockServiceSpec.Store http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala index 70ce643..89d2d64 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala @@ -65,6 +65,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with it should "produce 3 messages" in { val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) val app = StreamApp("dsl", context) val list = List[String]( "0", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar ---------------------------------------------------------------------- diff --git a/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar b/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar deleted file mode 100644 index a04c476..0000000 Binary files a/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar ---------------------------------------------------------------------- diff --git a/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar b/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar deleted file mode 100644 index 5cfb4a6..0000000 Binary files a/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar and /dev/null differ
