http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala new file mode 100644 index 0000000..32c9f08 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import java.io.{File, IOException} +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.Files +import java.nio.file.StandardOpenOption.{APPEND, WRITE} +import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.util.{Failure, Success} + +import akka.actor.{ActorRef, ActorSystem} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet +import akka.http.scaladsl.unmarshalling.Unmarshaller._ +import akka.stream.Materializer +import com.typesafe.config.Config + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList} +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue} +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.worker.WorkerSummary +import org.apache.gearpump.cluster.{ClusterConfig, UserConfig} +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} +import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication} +import org.apache.gearpump.util.ActorUtil._ +import org.apache.gearpump.util.FileDirective._ +import org.apache.gearpump.util.{Constants, Graph, Util} + +/** Manages service for master node */ +class MasterService(val master: ActorRef, + val jarStore: JarStoreService, override val system: ActorSystem) + extends BasicService { + + import upickle.default.{read, write} + + private val systemConfig = system.settings.config + private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) + + protected override def doRoute(implicit mat: Materializer) = pathPrefix("master") { + pathEnd { + get { + onComplete(askActor[MasterData](master, GetMasterData)) { + case Success(value: MasterData) => complete(write(value)) + case Failure(ex) => failWith(ex) + } + } + } ~ + path("applist") { + onComplete(askActor[AppMastersData](master, AppMastersDataRequest)) { + case Success(value: AppMastersData) => + complete(write(value)) + case Failure(ex) => failWith(ex) + } + } ~ + path("workerlist") { + def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, GetAllWorkers) + .flatMap { workerList => + val workers = workerList.workers + val workerDataList = List.empty[WorkerSummary] + + Future.fold(workers.map { workerId => + askWorker[WorkerData](master, workerId, GetWorkerData(workerId)) + })(workerDataList) { (workerDataList, workerData) => + workerDataList :+ workerData.workerDescription + } + } + onComplete(future) { + case Success(result: List[WorkerSummary]) => complete(write(result)) + case Failure(ex) => failWith(ex) + } + } ~ + path("config") { + onComplete(askActor[MasterConfig](master, QueryMasterConfig)) { + case Success(value: MasterConfig) => + val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") + complete(config) + case Failure(ex) => + failWith(ex) + } + } ~ + path("metrics" / RestPath) { path => + parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String => + val query = QueryHistoryMetrics(path.head.toString, readOption) + onComplete(askActor[HistoryMetrics](master, query)) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } ~ + path("submitapp") { + post { + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + val configFile = form.getFile("configfile").map(_.file) + val configString = form.getValue("configstring").getOrElse("") + val executorCount = form.getValue("executorcount").getOrElse("1").toInt + val args = form.getValue("args").getOrElse("") + + val mergedConfigFile = mergeConfig(configFile, configString) + + onComplete(Future( + MasterService.submitGearApp(jar, executorCount, args, systemConfig, mergedConfigFile) + )) { + case Success(success) => + val response = MasterService.AppSubmissionResult(success) + complete(write(response)) + case Failure(ex) => + failWith(ex) + } + } + } + } ~ + path("submitstormapp") { + post { + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + val configFile = form.getFile("configfile").map(_.file) + val args = form.getValue("args").getOrElse("") + onComplete(Future( + MasterService.submitStormApp(jar, configFile, args, systemConfig) + )) { + case Success(success) => + val response = MasterService.AppSubmissionResult(success) + complete(write(response)) + case Failure(ex) => + failWith(ex) + } + } + } + } ~ + path("submitdag") { + post { + entity(as[String]) { request => + val msg = java.net.URLDecoder.decode(request, "UTF-8") + val submitApplicationRequest = read[SubmitApplicationRequest](msg) + import submitApplicationRequest.{appName, dag, processors, userconfig} + val context = ClientContext(system.settings.config, system, master) + + val graph = dag.mapVertex { processorId => + processors(processorId) + }.mapEdge { (node1, edge, node2) => + PartitionerDescription(new PartitionerByClassName(edge)) + } + + val effectiveConfig = if (userconfig == null) UserConfig.empty else userconfig + val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)) + + import upickle.default.write + val submitApplicationResultValue = SubmitApplicationResultValue(appId) + val jsonData = write(submitApplicationResultValue) + complete(jsonData) + } + } + } ~ + path("uploadjar") { + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + if (jar.isEmpty) { + complete(write( + MasterService.Status(success = false, reason = "Jar file not found"))) + } else { + val jarFile = Util.uploadJar(jar.get, jarStore) + complete(write(jarFile)) + } + } + } ~ + path("partitioners") { + get { + complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName)))) + } + } + } + + private def mergeConfig(configFile: Option[File], configString: String): Option[File] = { + if (configString == null || configString.isEmpty) { + configFile + } else { + configFile match { + case Some(file) => + Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), APPEND) + Some(file) + case None => + val file = File.createTempFile("\"userfile_configstring_", ".conf") + Files.write(file.toPath, configString.getBytes(UTF_8), WRITE) + Some(file) + } + } + } +} + +object MasterService { + + case class BuiltinPartitioners(partitioners: Array[String]) + + case class AppSubmissionResult(success: Boolean) + + case class Status(success: Boolean, reason: String = null) + + /** + * Submits Native Application. + */ + def submitGearApp( + jar: Option[File], executorNum: Int, args: String, + systemConfig: Config, userConfigFile: Option[File]): Boolean = { + submitAndDeleteTempFiles( + "org.apache.gearpump.cluster.main.AppSubmitter", + argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args), + fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get), + classPath = getUserApplicationClassPath, + systemConfig, + userConfigFile + ) + } + + /** + * Submits Storm application. + */ + def submitStormApp( + jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { + submitAndDeleteTempFiles( + "org.apache.gearpump.experiments.storm.main.GearpumpStormClient", + argsArray = spaceSeparatedArgumentsToArray(args), + fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get), + classPath = getStormApplicationClassPath, + systemConfig, + userConfigFile = None + ) + } + + private def submitAndDeleteTempFiles( + mainClass: String, argsArray: Array[String], fileMap: Map[String, File], + classPath: Array[String], systemConfig: Config, + userConfigFile: Option[File] = None): Boolean = { + try { + val jar = fileMap.get("jar") + if (jar.isEmpty) { + throw new IOException("JAR file not supplied") + } + + val process = Util.startProcess( + clusterOptions(systemConfig, userConfigFile), + classPath, + mainClass, + arguments = createFilePathArgArray(fileMap) ++ argsArray + ) + + val retval = process.exitValue() + if (retval != 0) { + throw new IOException(s"Process exit abnormally with exit code $retval.\n" + + s"Error message: ${process.logger.error}") + } + true + } finally { + fileMap.values.foreach(_.delete) + if (userConfigFile.isDefined) { + userConfigFile.get.delete() + } + } + } + + /** + * Returns Java options for gearpump cluster + */ + private def clusterOptions(systemConfig: Config, userConfigFile: Option[File]): Array[String] = { + var options = Array( + s"-D${Constants.GEARPUMP_HOME}=${systemConfig.getString(Constants.GEARPUMP_HOME)}", + s"-D${Constants.GEARPUMP_HOSTNAME}=${systemConfig.getString(Constants.GEARPUMP_HOSTNAME)}", + s"-D${Constants.PREFER_IPV4}=true" + ) + + val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + .toList.flatMap(Util.parseHostList) + options ++= masters.zipWithIndex.map { case (master, index) => + s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}" + }.toArray[String] + + if (userConfigFile.isDefined) { + options :+= s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=${userConfigFile.get.getPath}" + } + options + } + + /** + * Filter all defined file paths and store their config key and path into an array. + */ + private def createFilePathArgArray(fileMap: Map[String, File]): Array[String] = { + var args = Array.empty[String] + fileMap.foreach({ case (key, path) => + args ++= Array(s"-$key", path.getPath) + }) + args + } + + /** + * Returns a space separated arguments as an array. + */ + private def spaceSeparatedArgumentsToArray(str: String): Array[String] = { + str.split(" +").filter(_.nonEmpty) + } + + private val homeDir = System.getProperty(Constants.GEARPUMP_HOME) + "/" + private val libHomeDir = homeDir + "lib/" + + private def getUserApplicationClassPath: Array[String] = { + Array( + homeDir + "conf", + libHomeDir + "daemon/*", + libHomeDir + "yarn/*", + libHomeDir + "*" + ) + } + + private def getStormApplicationClassPath: Array[String] = { + getUserApplicationClassPath ++ Array( + libHomeDir + "storm/*" + ) + } + + case class SubmitApplicationRequest( + appName: String, + processors: Map[ProcessorId, ProcessorDescription], + dag: Graph[Int, String], + userconfig: UserConfig) +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala new file mode 100644 index 0000000..87f9e34 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.{ActorRef, ActorSystem} +import akka.http.scaladsl.model.StatusCodes._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.{Route, _} +import akka.stream.ActorMaterializer +import akka.util.Timeout +import org.apache.commons.lang.exception.ExceptionUtils + +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.util.{Constants, LogUtil} +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** Contains all REST API service endpoints */ +class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem) + extends RouteService { + + implicit val timeout = Constants.FUTURE_TIMEOUT + + private val config = system.settings.config + + private val jarStoreService = JarStoreService.get(config) + jarStoreService.init(config, system) + + private val LOG = LogUtil.getLogger(getClass) + + private val securityEnabled = config.getBoolean( + Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED) + + private val supervisorPath = system.settings.config.getString( + Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) + + private val myExceptionHandler: ExceptionHandler = ExceptionHandler { + case ex: Throwable => { + extractUri { uri => + LOG.error(s"Request to $uri could not be handled normally", ex) + complete(InternalServerError, ExceptionUtils.getStackTrace(ex)) + } + } + } + + // Makes sure staticRoute is the final one, as it will try to lookup resource in local path + // if there is no match in previous routes + private val static = new StaticService(system, supervisorPath).route + + def supervisor: ActorRef = { + if (supervisorPath == null || supervisorPath.isEmpty()) { + null + } else { + val actorRef = system.actorSelection(supervisorPath).resolveOne() + Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration) + } + } + + override def route: Route = { + if (securityEnabled) { + val security = new SecurityService(services, system) + handleExceptions(myExceptionHandler) { + security.route ~ static + } + } else { + handleExceptions(myExceptionHandler) { + services.route ~ static + } + } + } + + private def services: RouteService = { + + val admin = new AdminService(system) + val masterService = new MasterService(master, jarStoreService, system) + val worker = new WorkerService(master, system) + val app = new AppMasterService(master, jarStoreService, system) + val sup = new SupervisorService(master, supervisor, system) + + new RouteService { + override def route: Route = { + admin.route ~ sup.route ~ masterService.route ~ worker.route ~ app.route + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala new file mode 100644 index 0000000..804b34f --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair} +import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri} +import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ +import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet +import akka.stream.Materializer +import com.typesafe.config.Config +import com.softwaremill.session.SessionDirectives._ +import com.softwaremill.session.SessionOptions._ +import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager} +import upickle.default.write + +import org.apache.gearpump.security.{Authenticator => BaseAuthenticator} +import org.apache.gearpump.services.SecurityService.{User, UserSession} +import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator +import org.apache.gearpump.util.{Constants, LogUtil} +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** + * Security authentication endpoint. + * + * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection + * - When user can be authenticated, but are not authorized to access certail resource, will + * return a 405 AuthorizationFailedRejection. + * - When web UI frontend receive 401, it should redirect the UI to login page. + * - When web UI receive 405,it should display errors like + * "current user is not authorized to access this resource." + * + * The Authenticator used is pluggable, the current Authenticator is resolved by looking up + * config path [[org.apache.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]]. + * + * See [[org.apache.gearpump.security.Authenticator]] to find more info on custom Authenticator. + */ +class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService { + + // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box. + private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump", + params = Map.empty) + + val LOG = LogUtil.getLogger(getClass, "AUDIT") + + private val config = system.settings.config + private val sessionConfig = SessionConfig.fromConfig(config) + private implicit val sessionManager: SessionManager[UserSession] = + new SessionManager[UserSession](sessionConfig) + + private val authenticator = { + val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS)) + val constructor = clazz.getConstructor(classOf[Config]) + val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator] + authenticator + } + + private def configToMap(config: Config, path: String) = { + import scala.collection.JavaConverters._ + config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } + } + + private val oauth2Providers: Map[String, String] = { + if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) { + val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS) + map.keys.toList.map { key => + val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon") + (key, iconPath) + }.toMap + } else { + Map.empty[String, String] + } + } + + private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext) + : Future[Option[UserSession]] = { + authenticator.authenticate(user, pass, ec).map { result => + if (result.authenticated) { + Some(UserSession(user, result.permissionLevel)) + } else { + None + } + } + } + + private def rejectMissingCredentials: Route = { + reject(AuthenticationFailedRejection(CredentialsMissing, challenge)) + } + + private def rejectWrongCredentials: Route = { + reject(AuthenticationFailedRejection(CredentialsRejected, challenge)) + } + + private def requireAuthentication(inner: UserSession => Route): Route = { + optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption => + sessionOption match { + case Some(session) => { + inner(session) + } + case None => + rejectMissingCredentials + } + } + } + + private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = { + setSession(oneOff, usingCookies, session) { + val user = session.user + // Default: 1 day + val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L) + setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"), + maxAge = Some(maxAgeMs))) { + LOG.info(s"user $user login from $ip") + if (redirectToRoot) { + redirect(Uri("/"), StatusCodes.TemporaryRedirect) + } else { + complete(write(new User(user))) + } + } + } + } + + private def logout(user: UserSession, ip: String): Route = { + invalidateSession(oneOff, usingCookies) { ctx => + LOG.info(s"user ${user.user} logout from $ip") + ctx.complete(write(new User(user.user))) + } + } + + // Only admin are able to access operation like post/delete/put + private def requireAuthorization(user: UserSession, route: => Route): Route = { + // Valid user + if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) { + route + } else { + // Possibly a guest or not authenticated. + (put | delete | post) { + // Reject with 405 authorization error + reject(AuthorizationFailedRejection) + } ~ + get { + route + } + } + } + + private val unknownIp: Directive1[RemoteAddress] = { + Directive[Tuple1[RemoteAddress]]{ inner => + inner(new Tuple1(RemoteAddress.Unknown)) + } + } + + override val route: Route = { + + extractExecutionContext{implicit ec: ExecutionContext => + extractMaterializer{implicit mat: Materializer => + (extractClientIP | unknownIp) { ip => + pathPrefix("login") { + pathEndOrSingleSlash { + get { + getFromResource("login/login.html") + } ~ + post { + // Guest account don't have permission to submit new application in UI + formField(FieldMagnet('username.as[String])) {user: String => + formFields(FieldMagnet('password.as[String])) {pass: String => + val result = authenticate(user, pass) + onSuccess(result) { + case Some(session) => + login(session, ip.toString) + case None => + rejectWrongCredentials + } + } + } + } + } ~ + path ("oauth2" / "providers") { + // Responds with a list of OAuth2 providers. + complete(write(oauth2Providers)) + } ~ + // Support OAUTH Authentication + pathPrefix ("oauth2"/ Segment) {providerName => + // Resolve OAUTH Authentication Provider + val oauthService = OAuth2Authenticator.get(config, providerName, ec) + + if (oauthService == null) { + // OAuth2 is disabled. + complete(StatusCodes.NotFound) + } else { + + def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = { + val result = oauthService.authenticate(parameters) + onComplete(result) { + case Success(session) => + login(session, ip.toString, redirectToRoot = true) + case Failure(ex) => { + LOG.info(s"Failed to login user from ${ip.toString}", ex) + rejectWrongCredentials + } + } + } + + path ("authorize") { + // Redirects to OAuth2 service provider for authorization. + redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect) + } ~ + path ("accesstoken") { + post { + // Guest account don't have permission to submit new application in UI + formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String => + loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken)) + } + } + } ~ + path("callback") { + // Login with authorization code or access token. + parameterMap {parameters => + loginWithOAuth2Parameters(parameters) + } + } + } + } + } ~ + path("logout") { + post { + requireAuthentication {session => + logout(session, ip.toString()) + } + } + } ~ + requireAuthentication {user => + requireAuthorization(user, inner.route) + } + }}} + + } +} + +object SecurityService { + + val SESSION_MANAGER_KEY = "akka.http.session.server-secret" + + case class UserSession(user: String, permissionLevel: Int) + + object UserSession { + + private val User = "user" + private val PermissionLevel = "permissionLevel" + + implicit def serializer: MultiValueSessionSerializer[UserSession] = { + new MultiValueSessionSerializer[UserSession]( + toMap = {t: UserSession => + Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString) + }, + fromMap = {m: Map[String, String] => + if (m.contains(User)) { + Try(UserSession(m(User), m(PermissionLevel).toInt)) + } else { + Failure[UserSession](new Exception("Fail to parse session ")) + } + } + ) + } + } + + case class User(user: String) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala new file mode 100644 index 0000000..284d3f2 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import akka.actor.ActorSystem +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.stream.Materializer + +import org.apache.gearpump.util.Util +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** + * static resource files. + */ +class StaticService(override val system: ActorSystem, supervisorPath: String) + extends BasicService { + + private val version = Util.version + + protected override def prefix = Neutral + + override def cache: Boolean = true + + protected override def doRoute(implicit mat: Materializer) = { + path("version") { + get { ctx => + ctx.complete(version) + } + } ~ + // For YARN usage, we need to make sure supervisor-path + // can be accessed without authentication. + path("supervisor-actor-path") { + get { + complete(supervisorPath) + } + } ~ + pathEndOrSingleSlash { + getFromResource("index.html") + } ~ + path("favicon.ico") { + complete(StatusCodes.NotFound) + } ~ + pathPrefix("webjars") { + get { + getFromResourceDirectory("META-INF/resources/webjars") + } + } ~ + path(Rest) { path => + getFromResource("%s" format path) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala new file mode 100644 index 0000000..fecf5ad --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.Future +import scala.util.{Failure, Success} + +import akka.actor.{ActorRef, ActorSystem} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.stream.Materializer + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.services.SupervisorService.{Path, Status} +import org.apache.gearpump.util.ActorUtil._ +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** Responsible for adding/removing machines. Typically it delegates to YARN. */ +class SupervisorService( + val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem) + extends BasicService { + + import upickle.default.write + + /** + * TODO: Add additional check to ensure the user have enough authorization to + * add/remove a worker machine + */ + private def authorize(internal: Route): Route = { + if (supervisor == null) { + failWith(new Exception("API not enabled, cannot find a valid supervisor! " + + "Please make sure Gearpump is running on top of YARN or other resource managers")) + } else { + internal + } + } + + protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") { + pathEnd { + get { + val path = if (supervisor == null) { + null + } else { + supervisor.path.toString + } + complete(write(Path(path))) + } + } ~ + path("status") { + post { + if (supervisor == null) { + complete(write(Status(enabled = false))) + } else { + complete(write(Status(enabled = true))) + } + } + } ~ + path("addworker" / IntNumber) { workerCount => + post { + authorize { + onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } + } ~ + path("removeworker" / Segment) { workerIdString => + post { + authorize { + val workerId = WorkerId.parse(workerIdString) + def future(): Future[CommandResult] = { + askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData => + val containerId = workerData.workerDescription.resourceManagerContainerId + askActor[CommandResult](supervisor, RemoveWorker(containerId)) + } + } + + onComplete[CommandResult](future()) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } + } + } +} + +object SupervisorService { + case class Status(enabled: Boolean) + + case class Path(path: String) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala new file mode 100644 index 0000000..8268d61 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.util.{Failure, Success} + +import akka.actor.{ActorRef, ActorSystem} +import akka.http.scaladsl.server.Directives._ +import akka.stream.Materializer + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption} +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig} +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.util.ActorUtil._ +import org.apache.gearpump.util.Constants +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** Service to handle worker related queries */ +class WorkerService(val master: ActorRef, override val system: ActorSystem) + extends BasicService { + + import upickle.default.write + private val systemConfig = system.settings.config + private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) + + protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { + workerIdString => { + pathEnd { + val workerId = WorkerId.parse(workerIdString) + onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) { + case Success(value: WorkerData) => + complete(write(value.workerDescription)) + case Failure(ex) => failWith(ex) + } + } + }~ + path("config") { + val workerId = WorkerId.parse(workerIdString) + onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) { + case Success(value: WorkerConfig) => + val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") + complete(config) + case Failure(ex) => + failWith(ex) + } + } ~ + path("metrics" / RestPath ) { path => + val workerId = WorkerId.parse(workerIdString) + parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => + val query = QueryHistoryMetrics(path.head.toString, readOption) + onComplete(askWorker[HistoryMetrics](master, workerId, query)) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala new file mode 100644 index 0000000..23c95c4 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.main + +import java.util.Random +import scala.collection.JavaConverters._ +import scala.concurrent.Await + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.server.Route +import akka.stream.ActorMaterializer +import com.typesafe.config.ConfigValueFactory +import org.slf4j.Logger +import sun.misc.BASE64Encoder + +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear} +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.services.{RestServices, SecurityService} +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util} + +/** Command line to start UI server */ +object Services extends AkkaApp with ArgumentsParser { + + private val LOG = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "master" -> CLIOption("<host:port>", required = false), + Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false), + "supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some(""))) + + override val description = "UI Server" + + override def akkaConfig: Config = { + ClusterConfig.ui() + } + + override def help(): Unit = { + // scalastyle:off println + Console.println("UI Server") + // scalastyle:on println + } + + private var killFunction: Option[() => Unit] = None + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + + val argConfig = parse(args) + var akkaConf = + if (argConfig.exists(Gear.OPTION_CONFIG)) { + ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG)) + } else { + inputAkkaConf + } + + val LOG: Logger = { + LogUtil.loadConfiguration(akkaConf, ProcessType.UI) + LogUtil.getLogger(getClass) + } + + if (argConfig.exists("master")) { + val master = argConfig.getString("master") + akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, + ConfigValueFactory.fromIterable(List(master).asJava)) + } + + akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH, + ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor"))) + // Creates a random unique secret key for session manager. + // All previous stored session token cookies will be invalidated when UI + // server is restarted. + .withValue(SecurityService.SESSION_MANAGER_KEY, + ConfigValueFactory.fromAnyRef(randomSeverSecret())) + + val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + .flatMap(Util.parseHostList) + + implicit val system = ActorSystem("services", akkaConf) + implicit val executionContext = system.dispatcher + + import scala.concurrent.duration._ + val master = system.actorOf(MasterProxy.props(masterCluster, 1.day), + s"masterproxy${system.name}") + val (host, port) = parseHostPort(system.settings.config) + + implicit val mat = ActorMaterializer() + val services = new RestServices(master, mat, system) + + val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port) + Await.result(bindFuture, 15.seconds) + + val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host + LOG.info(s"Please browse to http://$displayHost:$port to see the web UI") + + // scalastyle:off println + println(s"Please browse to http://$displayHost:$port to see the web UI") + // scalastyle:on println + + killFunction = Some { () => + LOG.info("Shutting down UI Server") + system.terminate() + } + + Await.result(system.whenTerminated, Duration.Inf) + } + + private def randomSeverSecret(): String = { + val random = new Random() + val length = 64 // Required + val bytes = new Array[Byte](length) + random.nextBytes(bytes) + val endecoder = new BASE64Encoder() + endecoder.encode(bytes) + } + + private def parseHostPort(config: Config): (String, Int) = { + val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP) + val host = config.getString(Constants.GEARPUMP_SERVICE_HOST) + (host, port) + } + + // TODO: fix this + // Hacks around for YARN module, so that we can kill the UI server + // when application is shutting down. + def kill(): Unit = { + if (killFunction.isDefined) { + killFunction.get.apply() + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala new file mode 100644 index 0000000..12dec1d --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump + +package object services { + final val REST_VERSION = "v1.0" +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala new file mode 100644 index 0000000..e9008aa --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2 + +import scala.concurrent.{ExecutionContext, Future} + +import com.typesafe.config.Config + +import org.apache.gearpump.services.SecurityService.UserSession +import org.apache.gearpump.util.Constants +import org.apache.gearpump.util.Constants._ + +/** + * + * Uses OAuth2 social-login as the mechanism for authentication. + * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works. + * + * Basically flow for OAuth2 Authentication: + * 1. User accesses Gearpump UI website, and choose to login with OAuth2 server. + * 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint. + * 3. End user complete the authorization in the domain of OAuth2 server. + * 4. OAuth2 server redirects user back to Gearpump UI server. + * 5. Gearpump UI server verify the tokens and extract credentials from query + * parameters and form fields. + * + * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is + * thread-safe. Sub-class should have a parameterless constructor. + * + * NOTE: OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are + * set properly if applied. + * + * Example: Config proxy when UI server is started on Windows: + * {{{ + * > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com + * -Dhttps.proxyPort=8088 + * > bin\services + * }}} + * + * Example: Config proxy when UI server is started on Linux: + * {{{ + * $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com + * -Dhttps.proxyPort=8088" + * $ bin/services + * }}} + */ +trait OAuth2Authenticator { + + /** + * Inits authenticator with config which contains client ID, client secret, and etc.. + * + * Typically, the client key and client secret is provided by OAuth2 Authorization server + * when user register an application there. + * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id, + * and client secret. + * + * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github + * use client key, and client secret. + * + * NOTE: '''Thread-Safety''': Framework ensures this call is synchronized. + * + * @param config Client Id, client secret, callback URL and etc.. + * @param executionContext ExecutionContext from hosting environment. + */ + def init(config: Config, executionContext: ExecutionContext): Unit + + /** + * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2 + * authorization. + * + * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer + * need to ensure thread safety. + */ + def getAuthorizationUrl: String + + /** + * After authorization, OAuth2 server redirects user back with tokens. This verify the + * tokens, retrieve the profiles, and return [[UserSession]] information. + * + * NOTE: This is an Async call. + * + * NOTE: This call requires external internet access. + * + * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer + * need to ensure thread safety. + * + * @param parameters HTTP Query and Post parameters, which typically contains Authorization code. + * @return UserSession if authentication pass. + */ + def authenticate(parameters: Map[String, String]): Future[UserSession] + + /** + * Clean up resource + */ + def close(): Unit +} + +object OAuth2Authenticator { + + // Serves as a quick immutable lookup cache + private var providers = Map.empty[String, OAuth2Authenticator] + + /** + * Load Authenticator from [[Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS]] + * + * @param provider, Name for the OAuth2 Authentication Service. + * @return Returns null if the OAuth2 Authentication is disabled. + */ + def get(config: Config, provider: String, executionContext: ExecutionContext) + : OAuth2Authenticator = { + + if (providers.contains(provider)) { + providers(provider) + } else { + val path = s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$provider" + val enabled = config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED) + if (enabled && config.hasPath(path)) { + this.synchronized { + if (providers.contains(provider)) { + providers(provider) + } else { + val authenticatorConfig = config.getConfig(path) + val authenticatorClass = authenticatorConfig.getString( + GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) + val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass) + val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator] + authenticator.init(authenticatorConfig, executionContext) + providers += provider -> authenticator + authenticator + } + } + } else { + null + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala new file mode 100644 index 0000000..603fb1d --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2.impl + +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.StringBuilder +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} + +import com.typesafe.config.Config +import com.github.scribejava.core.builder.ServiceBuilderAsync +import com.github.scribejava.core.builder.api.DefaultApi20 +import com.github.scribejava.core.model._ +import com.github.scribejava.core.oauth.OAuth20Service +import com.github.scribejava.core.utils.OAuthEncoder +import com.ning.http.client.AsyncHttpClientConfig + +import org.apache.gearpump.security.Authenticator +import org.apache.gearpump.services.SecurityService.UserSession +import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator +import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20 +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.Util + +/** + * Uses Ning AsyncClient to connect to OAuth2 service. + * + * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] + * for more API information. + */ +abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { + + // Authorize Url for end user to authorize + protected def authorizeUrl: String + + // Used to fetch the Access Token. + protected def accessTokenEndpoint: String + + // Protected resource Url to get the user profile + protected def protectedResourceUrl: String + + // Extracts the username information from response of protectedResourceUrl + protected def extractUserName(body: String): String + + // Scope required to access protectedResourceUrl + protected def scope: String + + // OAuth2 endpoint definition for ScribeJava. + protected def oauth2Api(): DefaultApi20 = { + new BaseApi20(authorizeUrl, accessTokenEndpoint) + } + + protected var oauthService: OAuth20Service = null + + protected var executionContext: ExecutionContext = null + + private var defaultPermissionLevel = Authenticator.Guest.permissionLevel + + // Synchronization ensured by the caller + override def init(config: Config, executionContext: ExecutionContext): Unit = { + if (this.oauthService == null) { + val callback = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK) + val clientId = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID) + val clientSecret = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET) + defaultPermissionLevel = { + val role = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE) + role match { + case "guest" => Authenticator.Guest.permissionLevel + case "user" => Authenticator.User.permissionLevel + case "admin" => Authenticator.Admin.permissionLevel + case _ => Authenticator.UnAuthenticated.permissionLevel + } + } + this.oauthService = buildOAuth2Service(clientId, clientSecret, callback) + this.executionContext = executionContext + } + } + + private val isClosed: AtomicBoolean = new AtomicBoolean(false) + + override def close(): Unit = { + if (isClosed.compareAndSet(false, true)) { + if (null != oauthService && null != oauthService.getAsyncHttpClient()) { + oauthService.getAsyncHttpClient().close() + } + } + } + + override def getAuthorizationUrl(): String = { + oauthService.getAuthorizationUrl() + } + + protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = { + val promise = Promise[UserSession]() + val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService) + oauthService.signRequest(accessToken, request) + request.sendAsync { + new OAuthAsyncRequestCallback[Response] { + override def onCompleted(response: Response): Unit = { + try { + val user = extractUserName(response.getBody) + promise.success(new UserSession(user, defaultPermissionLevel)) + } catch { + case ex: Throwable => + promise.failure(ex) + } + } + + override def onThrowable(throwable: Throwable): Unit = { + promise.failure(throwable) + } + } + } + promise.future + } + + protected def authenticateWithAuthorizationCode(code: String): Future[UserSession] = { + + implicit val ec: ExecutionContext = executionContext + + val promise = Promise[UserSession]() + oauthService.getAccessTokenAsync(code, + + new OAuthAsyncRequestCallback[OAuth2AccessToken] { + override def onCompleted(accessToken: OAuth2AccessToken): Unit = { + authenticateWithAccessToken(accessToken).onComplete { + case Success(user) => promise.success(user) + case Failure(ex) => promise.failure(ex) + } + } + + override def onThrowable(throwable: Throwable): Unit = { + promise.failure(throwable) + } + }) + promise.future + } + + override def authenticate(parameters: Map[String, String]): Future[UserSession] = { + + val code = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE) + val accessToken = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN) + + if (accessToken.isDefined) { + authenticateWithAccessToken(new OAuth2AccessToken(accessToken.get)) + } else if (code.isDefined) { + authenticateWithAuthorizationCode(code.get) + } else { + // Fails authentication if code not exist + Future.failed(new Exception("Fail to authenticate user as there is no code parameter in URL")) + } + } + + private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String) + : OAuth20Service = { + val state: String = "state" + Util.randInt() + ScribeJavaConfig.setForceTypeOfHttpRequests( + ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS) + val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder() + .setMaxConnections(5) + .setUseProxyProperties(true) + .setRequestTimeout(60000) + .setAllowPoolingConnections(false) + .setPooledConnectionIdleTimeout(60000) + .setReadTimeout(60000).build + + val service: OAuth20Service = new ServiceBuilderAsync() + .apiKey(clientId) + .apiSecret(clientSecret) + .scope(scope) + .state(state) + .callback(callback) + .asyncHttpClientConfig(clientConfig) + .build(oauth2Api()) + + service + } +} + +object BaseOAuth2Authenticator { + + class BaseApi20(authorizeUrl: String, accessTokenEndpoint: String) extends DefaultApi20 { + def getAccessTokenEndpoint: String = { + accessTokenEndpoint + } + + def getAuthorizationUrl(config: OAuthConfig): String = { + val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl, + config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback), + OAuthEncoder.encode(config.getScope))) + val state: String = config.getState + if (state != null) { + sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state)) + } + sb.toString + } + + override def createService(config: OAuthConfig): OAuth20Service = { + new OAuth20Service(this, config) { + + protected override def createAccessTokenRequest[T <: AbstractRequest]( + code: String, request: T): T = { + super.createAccessTokenRequest(code, request) + + if (!getConfig.hasGrantType) { + request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE) + } + + // Work-around for issue https://github.com/scribejava/scribejava/issues/641 + request.addHeader("Content-Type", "application/x-www-form-urlencoded") + request + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala new file mode 100644 index 0000000..ded50a7 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2.impl + +import scala.concurrent.{ExecutionContext, Future, Promise} + +import com.typesafe.config.Config +import com.github.scribejava.core.builder.api.DefaultApi20 +import com.github.scribejava.core.model._ +import com.github.scribejava.core.oauth.OAuth20Service +import com.ning.http.client +import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient} +import spray.json.{JsString, _} +import sun.misc.BASE64Encoder + +import org.apache.gearpump.services.SecurityService.UserSession +import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20 +import org.apache.gearpump.util.Constants._ + +/** + * + * Does authentication with CloudFoundry UAA service. Currently it only + * extract the email address of end user. + * + * For what is UAA, + * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]] + * (User Account and Authentication Service) + * + * Pre-requisite steps to use this Authenticator: + * + * Step1: Register your website to UAA with tool uaac. + * 1) Check tutorial on uaac at + * [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]] + * + * 2) Open a bash shell, set the UAA server by command `uaac target` + * {{{ + * uaac target [your uaa server url] + * }}} + * + * NOTE: [your uaa server url] should match the uaahost settings in gear.conf + * + * 3) Login in as user admin by + * {{{ + * uaac token client get admin -s MyAdminPassword + * }}} + * + * 4) Create a new Application (Client) in UAA, + * {{{ + * uaac client add [your_client_id] + * --scope "openid cloud_controller.read" + * --authorized_grant_types "authorization_code client_credentials refresh_token" + * --authorities "openid cloud_controller.read" + * --redirect_uri [your_redirect_url] + * --autoapprove true + * --secret [your_client_secret] + * }}} + * + * Step2: Configure the OAuth2 information in gear.conf + * + * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" + * as true. + * + * 2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" + * + * 3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section. + * Please make sure class name, client ID, client Secret, and callback URL are set properly. + * + * NOTE: The callback URL here should match what you set on CloudFoundry UAA in step1. + * + * Step3: Restart the UI service and try the "social login" button for UAA. + * + * NOTE: OAuth requires Internet access, @see + * [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to + * configure Internet proxy. + * + * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background + * information of OAuth2. + */ +class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator { + + import org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._ + + private var host: String = null + + protected override def authorizeUrl: String = + s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" + + protected override def accessTokenEndpoint: String = s"$host/oauth/token" + + protected override def protectedResourceUrl: String = s"$host/userinfo" + + protected override def scope: String = "openid,cloud_controller.read" + + private var additionalAuthenticator: Option[AdditionalAuthenticator] = None + + override def init(config: Config, executionContext: ExecutionContext): Unit = { + host = config.getString("uaahost") + super.init(config, executionContext) + + if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) { + val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR) + val authenticatorClass = additionalAuthenticatorConfig + .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) + val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass) + val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator] + authenticator.init(additionalAuthenticatorConfig, executionContext) + additionalAuthenticator = Option(authenticator) + } + } + + protected override def extractUserName(body: String): String = { + val email = body.parseJson.asJsObject.fields("email").asInstanceOf[JsString] + email.value + } + + protected override def oauth2Api(): DefaultApi20 = { + new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint) + } + + protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken) + : Future[UserSession] = { + + implicit val ec: ExecutionContext = executionContext + + if (additionalAuthenticator.isDefined) { + super.authenticateWithAccessToken(accessToken).flatMap { user => + additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user) + } + } else { + super.authenticateWithAccessToken(accessToken) + } + } +} + +object CloudFoundryUAAOAuth2Authenticator { + private val RESPONSE_TYPE = "response_type" + + val ADDITIONAL_AUTHENTICATOR_ENABLED = "additional-authenticator-enabled" + val ADDITIONAL_AUTHENTICATOR = "additional-authenticator" + + private class CloudFoundryUAAService(authorizeUrl: String, accessTokenEndpoint: String) + extends BaseApi20(authorizeUrl, accessTokenEndpoint) { + + private def base64(in: String): String = { + val encoder = new BASE64Encoder() + val utf8 = "UTF-8" + encoder.encode(in.getBytes(utf8)) + } + + override def createService(config: OAuthConfig): OAuth20Service = { + new OAuth20Service(this, config) { + + protected override def createAccessTokenRequest[T <: AbstractRequest]( + code: String, request: T): T = { + val config: OAuthConfig = getConfig() + + request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE) + request.addParameter(OAuthConstants.CODE, code) + request.addParameter(RESPONSE_TYPE, "token") + request.addParameter(OAuthConstants.REDIRECT_URI, config.getCallback) + + // Work around issue https://github.com/scribejava/scribejava/issues/641 + request.addHeader("Content-Type", "application/x-www-form-urlencoded") + + // CloudFoundry requires a Authorization header encoded with client Id and secret. + val authorizationHeader = "Basic " + base64(config.getApiKey + ":" + config.getApiSecret) + request.addHeader("Authorization", authorizationHeader) + request + } + } + } + } + + /** + * Additional authenticator to check more credential attributes of user before logging in. + * This authenticator is applied AFTER user pass the initial (default) authenticator. + */ + trait AdditionalAuthenticator { + + /** + * Initialization + * + * @param config Configurations specifically used for this authenticator. + * @param executionContext Execution Context to use to run futures. + */ + def init(config: Config, executionContext: ExecutionContext): Unit + + /** + * + * @param accessToken, the accessToken for the UAA + * @param user user session returned by previous authenticator + * @return an updated UserSession + */ + def authenticate( + asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession) + : Future[UserSession] + } + + val ORGANIZATION_URL = "organization-url" + + class OrganizationAccessChecker extends AdditionalAuthenticator { + private var organizationUrl: String = null + private implicit var executionContext: ExecutionContext = null + + override def init(config: Config, executionContext: ExecutionContext): Unit = { + this.organizationUrl = config.getString(ORGANIZATION_URL) + this.executionContext = executionContext + } + + override def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, + user: UserSession): Future[UserSession] = { + + val promise = Promise[UserSession]() + val builder = asyncClient.prepareGet(organizationUrl) + builder.addHeader("Authorization", s"bearer ${accessToken.getAccessToken}") + builder.execute(new AsyncCompletionHandler[Unit] { + override def onCompleted(response: client.Response): Unit = { + if (response.getStatusCode == 200) { + promise.success(user) + } else { + promise.failure(new Exception(response.getResponseBody)) + } + } + }) + promise.future + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala new file mode 100644 index 0000000..c62071c --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2.impl + +import com.github.scribejava.apis.google.GoogleJsonTokenExtractor +import com.github.scribejava.core.builder.api.DefaultApi20 +import com.github.scribejava.core.extractors.TokenExtractor +import com.github.scribejava.core.model._ +import spray.json._ + +/** + * + * Does authentication with Google OAuth2 service. It only extract the email address + * from user profile of Google. + * + * Pre-requisite steps to use this Authenticator: + * + * Step1: Register your website as an OAuth2 Application on Google + * 1) Create an application representing your website at [[https://console.developers.google.com]] + * + * 2) In "API Manager" of your created application, enable API "Google+ API" + * + * 3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager", + * choose "Create credentials", and then select OAuth client ID. Follow the wizard + * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional. + * + * Step2: Configure the OAuth2 information in gear.conf + * + * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" + * as true. + * + * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure + * class name, client ID, client Secret, and callback URL are set properly. + * + * NOTE: callback URL set here should match what is configured on Google in step1. + * + * Step3: Restart the UI service and try out the Google social login button in UI. + * + * NOTE: OAuth requires Internet access, @see + * [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] to find + * some helpful tutorials + * + * NOTE: Google use scope to define what data can be fetched by OAuth2. Currently we use profile + * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile + * in future. + * + * TODO Currently, this doesn't verify the state from Google OAuth2 response. + * + * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] for more + * API information. + */ +class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator { + + import org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._ + + protected override def authorizeUrl: String = AuthorizeUrl + + protected override def accessTokenEndpoint: String = AccessEndpoint + + protected override def protectedResourceUrl: String = ResourceUrl + + protected override def scope: String = GoogleOAuth2Authenticator.Scope + + protected override def extractUserName(body: String): String = { + val emails = body.parseJson.asJsObject.fields("emails").asInstanceOf[JsArray] + val email = emails.elements(0).asJsObject("Cannot find email account") + .fields("value").asInstanceOf[JsString].value + email + } + + override def oauth2Api(): DefaultApi20 = new AsyncGoogleApi20(authorizeUrl, accessTokenEndpoint) +} + +object GoogleOAuth2Authenticator { + + import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._ + + // scalastyle:off line.size.limit + val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" + // scalastyle:on line.size.limit + val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token" + val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me" + val Scope = "https://www.googleapis.com/auth/userinfo.email" + + private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String) + extends BaseApi20(authorizeUrl, accessEndpoint) { + + override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = { + GoogleJsonTokenExtractor.instance + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala new file mode 100644 index 0000000..caa3a33 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.util + +import upickle.Js + +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.util.Graph + +object UpickleUtil { + + // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the + // reader type automatically. + // See issue https://github.com/lihaoyi/upickle-pprint/issues/102 + implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = { + upickle.default.Reader[Graph[Int, String]] { + case Js.Obj(verties, edges) => + val vertexList = upickle.default.readJs[List[Int]](verties._2) + val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) + Graph(vertexList, edgeList) + } + } + + implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] { + case Js.Str(str) => + WorkerId.parse(str) + } + + implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] { + case workerId: WorkerId => + Js.Str(WorkerId.render(workerId)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala deleted file mode 100644 index e67731e..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil - -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -class AdminServiceSpec - extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { - - override def testConfig: Config = TestUtil.DEFAULT_CONFIG - - implicit def actorSystem: ActorSystem = system - - it should "shutdown the ActorSystem when receiving terminate" in { - val route = new AdminService(actorSystem).route - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Post(s"/terminate") ~> route) ~> check { - assert(status.intValue() == 404) - } - - Await.result(actorSystem.whenTerminated, 20.seconds) - - // terminate should terminate current actor system - assert(actorSystem.whenTerminated.isCompleted) - } -}
