http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala deleted file mode 100644 index d0fea30..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorRef, ActorSystem} -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.{Route, _} -import akka.stream.ActorMaterializer -import akka.util.Timeout -import org.apache.commons.lang.exception.ExceptionUtils - -import io.gearpump.jarstore.JarStoreService -import io.gearpump.util.{Constants, LogUtil} -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -/** Contains all REST API service endpoints */ -class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem) - extends RouteService { - - implicit val timeout = Constants.FUTURE_TIMEOUT - - private val config = system.settings.config - - private val jarStoreService = JarStoreService.get(config) - jarStoreService.init(config, system) - - private val LOG = LogUtil.getLogger(getClass) - - private val securityEnabled = config.getBoolean( - Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED) - - private val supervisorPath = system.settings.config.getString( - Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) - - private val myExceptionHandler: ExceptionHandler = ExceptionHandler { - case ex: Throwable => { - extractUri { uri => - LOG.error(s"Request to $uri could not be handled normally", ex) - complete(InternalServerError, ExceptionUtils.getStackTrace(ex)) - } - } - } - - // Makes sure staticRoute is the final one, as it will try to lookup resource in local path - // if there is no match in previous routes - private val static = new StaticService(system, supervisorPath).route - - def supervisor: ActorRef = { - if (supervisorPath == null || supervisorPath.isEmpty()) { - null - } else { - val actorRef = system.actorSelection(supervisorPath).resolveOne() - Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration) - } - } - - override def route: Route = { - if (securityEnabled) { - val security = new SecurityService(services, system) - handleExceptions(myExceptionHandler) { - security.route ~ static - } - } else { - handleExceptions(myExceptionHandler) { - services.route ~ static - } - } - } - - private def services: RouteService = { - - val admin = new AdminService(system) - val masterService = new MasterService(master, jarStoreService, system) - val worker = new WorkerService(master, system) - val app = new AppMasterService(master, jarStoreService, system) - val sup = new SupervisorService(master, supervisor, system) - - new RouteService { - override def route: Route = { - admin.route ~ sup.route ~ masterService.route ~ worker.route ~ app.route - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala deleted file mode 100644 index b1abf62..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair} -import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri} -import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet -import akka.stream.Materializer -import com.typesafe.config.Config -import com.softwaremill.session.SessionDirectives._ -import com.softwaremill.session.SessionOptions._ -import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager} -import upickle.default.write - -import io.gearpump.security.{Authenticator => BaseAuthenticator} -import io.gearpump.services.SecurityService.{User, UserSession} -import io.gearpump.services.security.oauth2.OAuth2Authenticator -import io.gearpump.util.{Constants, LogUtil} -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -/** - * Security authentication endpoint. - * - * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection - * - When user can be authenticated, but are not authorized to access certail resource, will - * return a 405 AuthorizationFailedRejection. - * - When web UI frontend receive 401, it should redirect the UI to login page. - * - When web UI receive 405,it should display errors like - * "current user is not authorized to access this resource." - * - * The Authenticator used is pluggable, the current Authenticator is resolved by looking up - * config path [[io.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]]. - * - * See [[io.gearpump.security.Authenticator]] to find more info on custom Authenticator. - */ -class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService { - - // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box. - private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump", - params = Map.empty) - - val LOG = LogUtil.getLogger(getClass, "AUDIT") - - private val config = system.settings.config - private val sessionConfig = SessionConfig.fromConfig(config) - private implicit val sessionManager: SessionManager[UserSession] = - new SessionManager[UserSession](sessionConfig) - - private val authenticator = { - val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS)) - val constructor = clazz.getConstructor(classOf[Config]) - val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator] - authenticator - } - - private def configToMap(config: Config, path: String) = { - import scala.collection.JavaConverters._ - config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } - } - - private val oauth2Providers: Map[String, String] = { - if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) { - val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS) - map.keys.toList.map { key => - val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon") - (key, iconPath) - }.toMap - } else { - Map.empty[String, String] - } - } - - private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext) - : Future[Option[UserSession]] = { - authenticator.authenticate(user, pass, ec).map { result => - if (result.authenticated) { - Some(UserSession(user, result.permissionLevel)) - } else { - None - } - } - } - - private def rejectMissingCredentials: Route = { - reject(AuthenticationFailedRejection(CredentialsMissing, challenge)) - } - - private def rejectWrongCredentials: Route = { - reject(AuthenticationFailedRejection(CredentialsRejected, challenge)) - } - - private def requireAuthentication(inner: UserSession => Route): Route = { - optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption => - sessionOption match { - case Some(session) => { - inner(session) - } - case None => - rejectMissingCredentials - } - } - } - - private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = { - setSession(oneOff, usingCookies, session) { - val user = session.user - // Default: 1 day - val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L) - setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"), - maxAge = Some(maxAgeMs))) { - LOG.info(s"user $user login from $ip") - if (redirectToRoot) { - redirect(Uri("/"), StatusCodes.TemporaryRedirect) - } else { - complete(write(new User(user))) - } - } - } - } - - private def logout(user: UserSession, ip: String): Route = { - invalidateSession(oneOff, usingCookies) { ctx => - LOG.info(s"user ${user.user} logout from $ip") - ctx.complete(write(new User(user.user))) - } - } - - // Only admin are able to access operation like post/delete/put - private def requireAuthorization(user: UserSession, route: => Route): Route = { - // Valid user - if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) { - route - } else { - // Possibly a guest or not authenticated. - (put | delete | post) { - // Reject with 405 authorization error - reject(AuthorizationFailedRejection) - } ~ - get { - route - } - } - } - - private val unknownIp: Directive1[RemoteAddress] = { - Directive[Tuple1[RemoteAddress]]{ inner => - inner(new Tuple1(RemoteAddress.Unknown)) - } - } - - override val route: Route = { - - extractExecutionContext{implicit ec: ExecutionContext => - extractMaterializer{implicit mat: Materializer => - (extractClientIP | unknownIp) { ip => - pathPrefix("login") { - pathEndOrSingleSlash { - get { - getFromResource("login/login.html") - } ~ - post { - // Guest account don't have permission to submit new application in UI - formField(FieldMagnet('username.as[String])) {user: String => - formFields(FieldMagnet('password.as[String])) {pass: String => - val result = authenticate(user, pass) - onSuccess(result) { - case Some(session) => - login(session, ip.toString) - case None => - rejectWrongCredentials - } - } - } - } - } ~ - path ("oauth2" / "providers") { - // Responds with a list of OAuth2 providers. - complete(write(oauth2Providers)) - } ~ - // Support OAUTH Authentication - pathPrefix ("oauth2"/ Segment) {providerName => - // Resolve OAUTH Authentication Provider - val oauthService = OAuth2Authenticator.get(config, providerName, ec) - - if (oauthService == null) { - // OAuth2 is disabled. - complete(StatusCodes.NotFound) - } else { - - def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = { - val result = oauthService.authenticate(parameters) - onComplete(result) { - case Success(session) => - login(session, ip.toString, redirectToRoot = true) - case Failure(ex) => { - LOG.info(s"Failed to login user from ${ip.toString}", ex) - rejectWrongCredentials - } - } - } - - path ("authorize") { - // Redirects to OAuth2 service provider for authorization. - redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect) - } ~ - path ("accesstoken") { - post { - // Guest account don't have permission to submit new application in UI - formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String => - loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken)) - } - } - } ~ - path("callback") { - // Login with authorization code or access token. - parameterMap {parameters => - loginWithOAuth2Parameters(parameters) - } - } - } - } - } ~ - path("logout") { - post { - requireAuthentication {session => - logout(session, ip.toString()) - } - } - } ~ - requireAuthentication {user => - requireAuthorization(user, inner.route) - } - }}} - - } -} - -object SecurityService { - - val SESSION_MANAGER_KEY = "akka.http.session.server-secret" - - case class UserSession(user: String, permissionLevel: Int) - - object UserSession { - - private val User = "user" - private val PermissionLevel = "permissionLevel" - - implicit def serializer: MultiValueSessionSerializer[UserSession] = { - new MultiValueSessionSerializer[UserSession]( - toMap = {t: UserSession => - Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString) - }, - fromMap = {m: Map[String, String] => - if (m.contains(User)) { - Try(UserSession(m(User), m(PermissionLevel).toInt)) - } else { - Failure[UserSession](new Exception("Fail to parse session ")) - } - } - ) - } - } - - case class User(user: String) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala deleted file mode 100644 index fc990a1..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import akka.actor.ActorSystem -import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.Directives._ -import akka.stream.Materializer - -import io.gearpump.util.Util -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -/** - * static resource files. - */ -class StaticService(override val system: ActorSystem, supervisorPath: String) - extends BasicService { - - private val version = Util.version - - protected override def prefix = Neutral - - override def cache: Boolean = true - - protected override def doRoute(implicit mat: Materializer) = { - path("version") { - get { ctx => - ctx.complete(version) - } - } ~ - // For YARN usage, we need to make sure supervisor-path - // can be accessed without authentication. - path("supervisor-actor-path") { - get { - complete(supervisorPath) - } - } ~ - pathEndOrSingleSlash { - getFromResource("index.html") - } ~ - path("favicon.ico") { - complete(StatusCodes.NotFound) - } ~ - pathPrefix("webjars") { - get { - getFromResourceDirectory("META-INF/resources/webjars") - } - } ~ - path(Rest) { path => - getFromResource("%s" format path) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala deleted file mode 100644 index c7f19e4..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.Future -import scala.util.{Failure, Success} - -import akka.actor.{ActorRef, ActorSystem} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route -import akka.stream.Materializer - -import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.services.SupervisorService.{Path, Status} -import io.gearpump.util.ActorUtil._ -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -/** Responsible for adding/removing machines. Typically it delegates to YARN. */ -class SupervisorService( - val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem) - extends BasicService { - - import upickle.default.write - - /** - * TODO: Add additional check to ensure the user have enough authorization to - * add/remove a worker machine - */ - private def authorize(internal: Route): Route = { - if (supervisor == null) { - failWith(new Exception("API not enabled, cannot find a valid supervisor! " + - "Please make sure Gearpump is running on top of YARN or other resource managers")) - } else { - internal - } - } - - protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") { - pathEnd { - get { - val path = if (supervisor == null) { - null - } else { - supervisor.path.toString - } - complete(write(Path(path))) - } - } ~ - path("status") { - post { - if (supervisor == null) { - complete(write(Status(enabled = false))) - } else { - complete(write(Status(enabled = true))) - } - } - } ~ - path("addworker" / IntNumber) { workerCount => - post { - authorize { - onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } - } ~ - path("removeworker" / Segment) { workerIdString => - post { - authorize { - val workerId = WorkerId.parse(workerIdString) - def future(): Future[CommandResult] = { - askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData => - val containerId = workerData.workerDescription.resourceManagerContainerId - askActor[CommandResult](supervisor, RemoveWorker(containerId)) - } - } - - onComplete[CommandResult](future()) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } - } - } -} - -object SupervisorService { - case class Status(enabled: Boolean) - - case class Path(path: String) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala deleted file mode 100644 index 78cdb37..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.util.{Failure, Success} - -import akka.actor.{ActorRef, ActorSystem} -import akka.http.scaladsl.server.Directives._ -import akka.stream.Materializer - -import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} -import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption} -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.ActorUtil._ -import io.gearpump.util.Constants -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -/** Service to handle worker related queries */ -class WorkerService(val master: ActorRef, override val system: ActorSystem) - extends BasicService { - - import upickle.default.write - private val systemConfig = system.settings.config - private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - - protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { - workerIdString => { - pathEnd { - val workerId = WorkerId.parse(workerIdString) - onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) { - case Success(value: WorkerData) => - complete(write(value.workerDescription)) - case Failure(ex) => failWith(ex) - } - } - }~ - path("config") { - val workerId = WorkerId.parse(workerIdString) - onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) { - case Success(value: WorkerConfig) => - val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") - complete(config) - case Failure(ex) => - failWith(ex) - } - } ~ - path("metrics" / RestPath ) { path => - val workerId = WorkerId.parse(workerIdString) - parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => - val query = QueryHistoryMetrics(path.head.toString, readOption) - onComplete(askWorker[HistoryMetrics](master, workerId, query)) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala deleted file mode 100644 index a647a8d..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.main - -import java.util.Random -import scala.collection.JavaConverters._ -import scala.concurrent.Await - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.server.Route -import akka.stream.ActorMaterializer -import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger -import sun.misc.BASE64Encoder - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear} -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.services.{RestServices, SecurityService} -import io.gearpump.util.LogUtil.ProcessType -import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} - -/** Command line to start UI server */ -object Services extends AkkaApp with ArgumentsParser { - - private val LOG = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "master" -> CLIOption("<host:port>", required = false), - Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false), - "supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some(""))) - - override val description = "UI Server" - - override def akkaConfig: Config = { - ClusterConfig.ui() - } - - override def help(): Unit = { - // scalastyle:off println - Console.println("UI Server") - // scalastyle:on println - } - - private var killFunction: Option[() => Unit] = None - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - - val argConfig = parse(args) - var akkaConf = - if (argConfig.exists(Gear.OPTION_CONFIG)) { - ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG)) - } else { - inputAkkaConf - } - - val LOG: Logger = { - LogUtil.loadConfiguration(akkaConf, ProcessType.UI) - LogUtil.getLogger(getClass) - } - - if (argConfig.exists("master")) { - val master = argConfig.getString("master") - akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, - ConfigValueFactory.fromIterable(List(master).asJava)) - } - - akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH, - ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor"))) - // Creates a random unique secret key for session manager. - // All previous stored session token cookies will be invalidated when UI - // server is restarted. - .withValue(SecurityService.SESSION_MANAGER_KEY, - ConfigValueFactory.fromAnyRef(randomSeverSecret())) - - val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala - .flatMap(Util.parseHostList) - - implicit val system = ActorSystem("services", akkaConf) - implicit val executionContext = system.dispatcher - - import scala.concurrent.duration._ - val master = system.actorOf(MasterProxy.props(masterCluster, 1.day), - s"masterproxy${system.name}") - val (host, port) = parseHostPort(system.settings.config) - - implicit val mat = ActorMaterializer() - val services = new RestServices(master, mat, system) - - val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port) - Await.result(bindFuture, 15.seconds) - - val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host - LOG.info(s"Please browse to http://$displayHost:$port to see the web UI") - - // scalastyle:off println - println(s"Please browse to http://$displayHost:$port to see the web UI") - // scalastyle:on println - - killFunction = Some { () => - LOG.info("Shutting down UI Server") - system.terminate() - } - - Await.result(system.whenTerminated, Duration.Inf) - } - - private def randomSeverSecret(): String = { - val random = new Random() - val length = 64 // Required - val bytes = new Array[Byte](length) - random.nextBytes(bytes) - val endecoder = new BASE64Encoder() - endecoder.encode(bytes) - } - - private def parseHostPort(config: Config): (String, Int) = { - val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP) - val host = config.getString(Constants.GEARPUMP_SERVICE_HOST) - (host, port) - } - - // TODO: fix this - // Hacks around for YARN module, so that we can kill the UI server - // when application is shutting down. - def kill(): Unit = { - if (killFunction.isDefined) { - killFunction.get.apply() - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/package.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/package.scala b/services/jvm/src/main/scala/io/gearpump/services/package.scala deleted file mode 100644 index 7b38134..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/package.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump - -package object services { - final val REST_VERSION = "v1.0" -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala deleted file mode 100644 index 158d406..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2 - -import scala.concurrent.{ExecutionContext, Future} - -import com.typesafe.config.Config - -import io.gearpump.services.SecurityService.UserSession -import io.gearpump.util.Constants -import io.gearpump.util.Constants._ - -/** - * - * Uses OAuth2 social-login as the mechanism for authentication. - * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works. - * - * Basically flow for OAuth2 Authentication: - * 1. User accesses Gearpump UI website, and choose to login with OAuth2 server. - * 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint. - * 3. End user complete the authorization in the domain of OAuth2 server. - * 4. OAuth2 server redirects user back to Gearpump UI server. - * 5. Gearpump UI server verify the tokens and extract credentials from query - * parameters and form fields. - * - * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is - * thread-safe. Sub-class should have a parameterless constructor. - * - * NOTE: OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are - * set properly if applied. - * - * Example: Config proxy when UI server is started on Windows: - * {{{ - * > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com - * -Dhttps.proxyPort=8088 - * > bin\services - * }}} - * - * Example: Config proxy when UI server is started on Linux: - * {{{ - * $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com - * -Dhttps.proxyPort=8088" - * $ bin/services - * }}} - */ -trait OAuth2Authenticator { - - /** - * Inits authenticator with config which contains client ID, client secret, and etc.. - * - * Typically, the client key and client secret is provided by OAuth2 Authorization server - * when user register an application there. - * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id, - * and client secret. - * - * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github - * use client key, and client secret. - * - * NOTE: '''Thread-Safety''': Framework ensures this call is synchronized. - * - * @param config Client Id, client secret, callback URL and etc.. - * @param executionContext ExecutionContext from hosting environment. - */ - def init(config: Config, executionContext: ExecutionContext): Unit - - /** - * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2 - * authorization. - * - * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer - * need to ensure thread safety. - */ - def getAuthorizationUrl: String - - /** - * After authorization, OAuth2 server redirects user back with tokens. This verify the - * tokens, retrieve the profiles, and return [[UserSession]] information. - * - * NOTE: This is an Async call. - * - * NOTE: This call requires external internet access. - * - * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer - * need to ensure thread safety. - * - * @param parameters HTTP Query and Post parameters, which typically contains Authorization code. - * @return UserSession if authentication pass. - */ - def authenticate(parameters: Map[String, String]): Future[UserSession] - - /** - * Clean up resource - */ - def close(): Unit -} - -object OAuth2Authenticator { - - // Serves as a quick immutable lookup cache - private var providers = Map.empty[String, OAuth2Authenticator] - - /** - * Load Authenticator from [[Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS]] - * - * @param provider, Name for the OAuth2 Authentication Service. - * @return Returns null if the OAuth2 Authentication is disabled. - */ - def get(config: Config, provider: String, executionContext: ExecutionContext) - : OAuth2Authenticator = { - - if (providers.contains(provider)) { - providers(provider) - } else { - val path = s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$provider" - val enabled = config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED) - if (enabled && config.hasPath(path)) { - this.synchronized { - if (providers.contains(provider)) { - providers(provider) - } else { - val authenticatorConfig = config.getConfig(path) - val authenticatorClass = authenticatorConfig.getString( - GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) - val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass) - val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator] - authenticator.init(authenticatorConfig, executionContext) - providers += provider -> authenticator - authenticator - } - } - } else { - null - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala deleted file mode 100644 index e422d3f..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2.impl - -import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable.StringBuilder -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} - -import com.typesafe.config.Config -import com.github.scribejava.core.builder.ServiceBuilderAsync -import com.github.scribejava.core.builder.api.DefaultApi20 -import com.github.scribejava.core.model._ -import com.github.scribejava.core.oauth.OAuth20Service -import com.github.scribejava.core.utils.OAuthEncoder -import com.ning.http.client.AsyncHttpClientConfig - -import io.gearpump.security.Authenticator -import io.gearpump.services.SecurityService.UserSession -import io.gearpump.services.security.oauth2.OAuth2Authenticator -import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20 -import io.gearpump.util.Constants._ -import io.gearpump.util.Util - -/** - * Uses Ning AsyncClient to connect to OAuth2 service. - * - * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information. - */ -abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { - - // Authorize Url for end user to authorize - protected def authorizeUrl: String - - // Used to fetch the Access Token. - protected def accessTokenEndpoint: String - - // Protected resource Url to get the user profile - protected def protectedResourceUrl: String - - // Extracts the username information from response of protectedResourceUrl - protected def extractUserName(body: String): String - - // Scope required to access protectedResourceUrl - protected def scope: String - - // OAuth2 endpoint definition for ScribeJava. - protected def oauth2Api(): DefaultApi20 = { - new BaseApi20(authorizeUrl, accessTokenEndpoint) - } - - protected var oauthService: OAuth20Service = null - - protected var executionContext: ExecutionContext = null - - private var defaultPermissionLevel = Authenticator.Guest.permissionLevel - - // Synchronization ensured by the caller - override def init(config: Config, executionContext: ExecutionContext): Unit = { - if (this.oauthService == null) { - val callback = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK) - val clientId = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID) - val clientSecret = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET) - defaultPermissionLevel = { - val role = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE) - role match { - case "guest" => Authenticator.Guest.permissionLevel - case "user" => Authenticator.User.permissionLevel - case "admin" => Authenticator.Admin.permissionLevel - case _ => Authenticator.UnAuthenticated.permissionLevel - } - } - this.oauthService = buildOAuth2Service(clientId, clientSecret, callback) - this.executionContext = executionContext - } - } - - private val isClosed: AtomicBoolean = new AtomicBoolean(false) - - override def close(): Unit = { - if (isClosed.compareAndSet(false, true)) { - if (null != oauthService && null != oauthService.getAsyncHttpClient()) { - oauthService.getAsyncHttpClient().close() - } - } - } - - override def getAuthorizationUrl(): String = { - oauthService.getAuthorizationUrl() - } - - protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = { - val promise = Promise[UserSession]() - val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService) - oauthService.signRequest(accessToken, request) - request.sendAsync { - new OAuthAsyncRequestCallback[Response] { - override def onCompleted(response: Response): Unit = { - try { - val user = extractUserName(response.getBody) - promise.success(new UserSession(user, defaultPermissionLevel)) - } catch { - case ex: Throwable => - promise.failure(ex) - } - } - - override def onThrowable(throwable: Throwable): Unit = { - promise.failure(throwable) - } - } - } - promise.future - } - - protected def authenticateWithAuthorizationCode(code: String): Future[UserSession] = { - - implicit val ec: ExecutionContext = executionContext - - val promise = Promise[UserSession]() - oauthService.getAccessTokenAsync(code, - - new OAuthAsyncRequestCallback[OAuth2AccessToken] { - override def onCompleted(accessToken: OAuth2AccessToken): Unit = { - authenticateWithAccessToken(accessToken).onComplete { - case Success(user) => promise.success(user) - case Failure(ex) => promise.failure(ex) - } - } - - override def onThrowable(throwable: Throwable): Unit = { - promise.failure(throwable) - } - }) - promise.future - } - - override def authenticate(parameters: Map[String, String]): Future[UserSession] = { - - val code = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE) - val accessToken = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN) - - if (accessToken.isDefined) { - authenticateWithAccessToken(new OAuth2AccessToken(accessToken.get)) - } else if (code.isDefined) { - authenticateWithAuthorizationCode(code.get) - } else { - // Fails authentication if code not exist - Future.failed(new Exception("Fail to authenticate user as there is no code parameter in URL")) - } - } - - private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String) - : OAuth20Service = { - val state: String = "state" + Util.randInt() - ScribeJavaConfig.setForceTypeOfHttpRequests( - ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS) - val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder() - .setMaxConnections(5) - .setUseProxyProperties(true) - .setRequestTimeout(60000) - .setAllowPoolingConnections(false) - .setPooledConnectionIdleTimeout(60000) - .setReadTimeout(60000).build - - val service: OAuth20Service = new ServiceBuilderAsync() - .apiKey(clientId) - .apiSecret(clientSecret) - .scope(scope) - .state(state) - .callback(callback) - .asyncHttpClientConfig(clientConfig) - .build(oauth2Api()) - - service - } -} - -object BaseOAuth2Authenticator { - - class BaseApi20(authorizeUrl: String, accessTokenEndpoint: String) extends DefaultApi20 { - def getAccessTokenEndpoint: String = { - accessTokenEndpoint - } - - def getAuthorizationUrl(config: OAuthConfig): String = { - val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl, - config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback), - OAuthEncoder.encode(config.getScope))) - val state: String = config.getState - if (state != null) { - sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state)) - } - sb.toString - } - - override def createService(config: OAuthConfig): OAuth20Service = { - new OAuth20Service(this, config) { - - protected override def createAccessTokenRequest[T <: AbstractRequest]( - code: String, request: T): T = { - super.createAccessTokenRequest(code, request) - - if (!getConfig.hasGrantType) { - request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE) - } - - // Work-around for issue https://github.com/scribejava/scribejava/issues/641 - request.addHeader("Content-Type", "application/x-www-form-urlencoded") - request - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala deleted file mode 100644 index 98e37ea..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2.impl - -import scala.concurrent.{ExecutionContext, Future, Promise} - -import com.typesafe.config.Config -import com.github.scribejava.core.builder.api.DefaultApi20 -import com.github.scribejava.core.model._ -import com.github.scribejava.core.oauth.OAuth20Service -import com.ning.http.client -import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient} -import spray.json.{JsString, _} -import sun.misc.BASE64Encoder - -import io.gearpump.services.SecurityService.UserSession -import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20 -import io.gearpump.util.Constants._ - -/** - * - * Does authentication with CloudFoundry UAA service. Currently it only - * extract the email address of end user. - * - * For what is UAA, - * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]] - * (User Account and Authentication Service) - * - * Pre-requisite steps to use this Authenticator: - * - * Step1: Register your website to UAA with tool uaac. - * 1) Check tutorial on uaac at - * [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]] - * - * 2) Open a bash shell, set the UAA server by command `uaac target` - * {{{ - * uaac target [your uaa server url] - * }}} - * - * NOTE: [your uaa server url] should match the uaahost settings in gear.conf - * - * 3) Login in as user admin by - * {{{ - * uaac token client get admin -s MyAdminPassword - * }}} - * - * 4) Create a new Application (Client) in UAA, - * {{{ - * uaac client add [your_client_id] - * --scope "openid cloud_controller.read" - * --authorized_grant_types "authorization_code client_credentials refresh_token" - * --authorities "openid cloud_controller.read" - * --redirect_uri [your_redirect_url] - * --autoapprove true - * --secret [your_client_secret] - * }}} - * - * Step2: Configure the OAuth2 information in gear.conf - * - * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" - * as true. - * - * 2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" - * - * 3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section. - * Please make sure class name, client ID, client Secret, and callback URL are set properly. - * - * NOTE: The callback URL here should match what you set on CloudFoundry UAA in step1. - * - * Step3: Restart the UI service and try the "social login" button for UAA. - * - * NOTE: OAuth requires Internet access, @see - * [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to - * configure Internet proxy. - * - * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background - * information of OAuth2. - */ -class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator { - - import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._ - - private var host: String = null - - protected override def authorizeUrl: String = - s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" - - protected override def accessTokenEndpoint: String = s"$host/oauth/token" - - protected override def protectedResourceUrl: String = s"$host/userinfo" - - protected override def scope: String = "openid,cloud_controller.read" - - private var additionalAuthenticator: Option[AdditionalAuthenticator] = None - - override def init(config: Config, executionContext: ExecutionContext): Unit = { - host = config.getString("uaahost") - super.init(config, executionContext) - - if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) { - val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR) - val authenticatorClass = additionalAuthenticatorConfig - .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) - val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass) - val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator] - authenticator.init(additionalAuthenticatorConfig, executionContext) - additionalAuthenticator = Option(authenticator) - } - } - - protected override def extractUserName(body: String): String = { - val email = body.parseJson.asJsObject.fields("email").asInstanceOf[JsString] - email.value - } - - protected override def oauth2Api(): DefaultApi20 = { - new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint) - } - - protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken) - : Future[UserSession] = { - - implicit val ec: ExecutionContext = executionContext - - if (additionalAuthenticator.isDefined) { - super.authenticateWithAccessToken(accessToken).flatMap { user => - additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user) - } - } else { - super.authenticateWithAccessToken(accessToken) - } - } -} - -object CloudFoundryUAAOAuth2Authenticator { - private val RESPONSE_TYPE = "response_type" - - val ADDITIONAL_AUTHENTICATOR_ENABLED = "additional-authenticator-enabled" - val ADDITIONAL_AUTHENTICATOR = "additional-authenticator" - - private class CloudFoundryUAAService(authorizeUrl: String, accessTokenEndpoint: String) - extends BaseApi20(authorizeUrl, accessTokenEndpoint) { - - private def base64(in: String): String = { - val encoder = new BASE64Encoder() - val utf8 = "UTF-8" - encoder.encode(in.getBytes(utf8)) - } - - override def createService(config: OAuthConfig): OAuth20Service = { - new OAuth20Service(this, config) { - - protected override def createAccessTokenRequest[T <: AbstractRequest]( - code: String, request: T): T = { - val config: OAuthConfig = getConfig() - - request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE) - request.addParameter(OAuthConstants.CODE, code) - request.addParameter(RESPONSE_TYPE, "token") - request.addParameter(OAuthConstants.REDIRECT_URI, config.getCallback) - - // Work around issue https://github.com/scribejava/scribejava/issues/641 - request.addHeader("Content-Type", "application/x-www-form-urlencoded") - - // CloudFoundry requires a Authorization header encoded with client Id and secret. - val authorizationHeader = "Basic " + base64(config.getApiKey + ":" + config.getApiSecret) - request.addHeader("Authorization", authorizationHeader) - request - } - } - } - } - - /** - * Additional authenticator to check more credential attributes of user before logging in. - * This authenticator is applied AFTER user pass the initial (default) authenticator. - */ - trait AdditionalAuthenticator { - - /** - * Initialization - * - * @param config Configurations specifically used for this authenticator. - * @param executionContext Execution Context to use to run futures. - */ - def init(config: Config, executionContext: ExecutionContext): Unit - - /** - * - * @param accessToken, the accessToken for the UAA - * @param user user session returned by previous authenticator - * @return an updated UserSession - */ - def authenticate( - asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession) - : Future[UserSession] - } - - val ORGANIZATION_URL = "organization-url" - - class OrganizationAccessChecker extends AdditionalAuthenticator { - private var organizationUrl: String = null - private implicit var executionContext: ExecutionContext = null - - override def init(config: Config, executionContext: ExecutionContext): Unit = { - this.organizationUrl = config.getString(ORGANIZATION_URL) - this.executionContext = executionContext - } - - override def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, - user: UserSession): Future[UserSession] = { - - val promise = Promise[UserSession]() - val builder = asyncClient.prepareGet(organizationUrl) - builder.addHeader("Authorization", s"bearer ${accessToken.getAccessToken}") - builder.execute(new AsyncCompletionHandler[Unit] { - override def onCompleted(response: client.Response): Unit = { - if (response.getStatusCode == 200) { - promise.success(user) - } else { - promise.failure(new Exception(response.getResponseBody)) - } - } - }) - promise.future - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala deleted file mode 100644 index c2b18fd..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2.impl - -import com.github.scribejava.apis.google.GoogleJsonTokenExtractor -import com.github.scribejava.core.builder.api.DefaultApi20 -import com.github.scribejava.core.extractors.TokenExtractor -import com.github.scribejava.core.model._ -import spray.json._ - -/** - * - * Does authentication with Google OAuth2 service. It only extract the email address - * from user profile of Google. - * - * Pre-requisite steps to use this Authenticator: - * - * Step1: Register your website as an OAuth2 Application on Google - * 1) Create an application representing your website at [[https://console.developers.google.com]] - * - * 2) In "API Manager" of your created application, enable API "Google+ API" - * - * 3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager", - * choose "Create credentials", and then select OAuth client ID. Follow the wizard - * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional. - * - * Step2: Configure the OAuth2 information in gear.conf - * - * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" - * as true. - * - * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure - * class name, client ID, client Secret, and callback URL are set properly. - * - * NOTE: callback URL set here should match what is configured on Google in step1. - * - * Step3: Restart the UI service and try out the Google social login button in UI. - * - * NOTE: OAuth requires Internet access, @see - * [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find some helpful tutorials - * - * NOTE: Google use scope to define what data can be fetched by OAuth2. Currently we use profile - * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile - * in future. - * - * TODO Currently, this doesn't verify the state from Google OAuth2 response. - * - * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information. - */ -class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator { - - import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._ - - protected override def authorizeUrl: String = AuthorizeUrl - - protected override def accessTokenEndpoint: String = AccessEndpoint - - protected override def protectedResourceUrl: String = ResourceUrl - - protected override def scope: String = GoogleOAuth2Authenticator.Scope - - protected override def extractUserName(body: String): String = { - val emails = body.parseJson.asJsObject.fields("emails").asInstanceOf[JsArray] - val email = emails.elements(0).asJsObject("Cannot find email account") - .fields("value").asInstanceOf[JsString].value - email - } - - override def oauth2Api(): DefaultApi20 = new AsyncGoogleApi20(authorizeUrl, accessTokenEndpoint) -} - -object GoogleOAuth2Authenticator { - - import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._ - - // scalastyle:off line.size.limit - val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" - // scalastyle:on line.size.limit - val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token" - val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me" - val Scope = "https://www.googleapis.com/auth/userinfo.email" - - private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String) - extends BaseApi20(authorizeUrl, accessEndpoint) { - - override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = { - GoogleJsonTokenExtractor.instance - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala deleted file mode 100644 index f95474e..0000000 --- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.util - -import upickle.Js - -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.Graph - -object UpickleUtil { - - // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the - // reader type automatically. - // See issue https://github.com/lihaoyi/upickle-pprint/issues/102 - implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = { - upickle.default.Reader[Graph[Int, String]] { - case Js.Obj(verties, edges) => - val vertexList = upickle.default.readJs[List[Int]](verties._2) - val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) - Graph(vertexList, edgeList) - } - } - - implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] { - case Js.Str(str) => - WorkerId.parse(str) - } - - implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] { - case workerId: WorkerId => - Js.Str(WorkerId.render(workerId)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala new file mode 100644 index 0000000..4d1ba22 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import akka.actor.ActorSystem +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.stream.Materializer + +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +/** + * AdminService is for cluster-wide managements. it is not related with + * specific application. + * + * For example: + * - Security management: Add user, remove user. + * - Configuration management: Change configurations. + * - Machine management: Add worker machines, remove worker machines, and add masters. + */ + +// TODO: Add YARN resource manager capacities to add/remove machines. +class AdminService(override val system: ActorSystem) + extends BasicService { + + protected override def prefix = Neutral + + protected override def doRoute(implicit mat: Materializer) = { + path("terminate") { + post { + system.terminate() + complete(StatusCodes.NotFound) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala new file mode 100644 index 0000000..1ca2306 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.util.{Failure, Success, Try} + +import akka.actor.{ActorRef, ActorSystem} +import akka.http.scaladsl.model.{FormData, Multipart} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet +import akka.stream.Materializer +import upickle.default.{read, write} + +import org.apache.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, GeneralAppMasterSummary} +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} +import org.apache.gearpump.cluster.MasterToClient._ +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.services.AppMasterService.Status +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks +import org.apache.gearpump.streaming.appmaster.DagManager._ +import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary +import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} +import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster} +import org.apache.gearpump.util.FileDirective._ +import org.apache.gearpump.util.{Constants, Util} + +/** + * Management service for AppMaster + */ +class AppMasterService(val master: ActorRef, + val jarStore: JarStoreService, override val system: ActorSystem) + extends BasicService { + + private val systemConfig = system.settings.config + private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) + + protected override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) { + appId => { + path("dynamicdag") { + parameters(ParamMagnet("args")) { args: String => + def replaceProcessor(dagOperation: DAGOperation): Route = { + onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + + val msg = java.net.URLDecoder.decode(args, "UTF-8") + val dagOperation = read[DAGOperation](msg) + (post & entity(as[Multipart.FormData])) { _ => + uploadFile { form => + val jar = form.getFile("jar").map(_.file) + + if (jar.nonEmpty) { + dagOperation match { + case replace: ReplaceProcessor => + val description = replace.newProcessorDescription.copy(jar = + Util.uploadJar(jar.get, jarStore)) + val dagOperationWithJar = replace.copy(newProcessorDescription = description) + replaceProcessor(dagOperationWithJar) + } + } else { + replaceProcessor(dagOperation) + } + } + } ~ (post & entity(as[FormData])) { _ => + replaceProcessor(dagOperation) + } + } + } ~ + path("stallingtasks") { + onComplete(askAppMaster[StallingTasks](master, appId, GetStallingTasks(appId))) { + case Success(value) => + complete(write(value)) + case Failure(ex) => failWith(ex) + } + } ~ + path("errors") { + onComplete(askAppMaster[LastFailure](master, appId, GetLastFailure(appId))) { + case Success(value) => + complete(write(value)) + case Failure(ex) => failWith(ex) + } + } ~ + path("restart") { + post { + onComplete(askActor[SubmitApplicationResult](master, RestartApplication(appId))) { + case Success(_) => + complete(write(Status(true))) + case Failure(ex) => + complete(write(Status(false, ex.getMessage))) + } + } + } ~ + path("config") { + onComplete(askActor[AppMasterConfig](master, QueryAppMasterConfig(appId))) { + case Success(value: AppMasterConfig) => + val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") + complete(config) + case Failure(ex) => + failWith(ex) + } + } ~ + pathPrefix("executor" / Segment) { executorIdString => + path("config") { + val executorId = Integer.parseInt(executorIdString) + onComplete(askAppMaster[ExecutorConfig](master, appId, QueryExecutorConfig(executorId))) { + case Success(value) => + val config = Option(value.config).map(ClusterConfig.render(_, concise)) + .getOrElse("{}") + complete(config) + case Failure(ex) => + failWith(ex) + } + } ~ + pathEnd { + get { + val executorId = Integer.parseInt(executorIdString) + onComplete(askAppMaster[ExecutorSummary](master, appId, + GetExecutorSummary(executorId))) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } + } ~ + path("metrics" / RestPath) { path => + parameterMap { optionMap => + parameter("aggregator" ? "") { aggregator => + parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => + val query = QueryHistoryMetrics(path.head.toString, readOption, aggregator, optionMap) + onComplete(askAppMaster[HistoryMetrics](master, appId, query)) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } + } + } ~ + pathEnd { + get { + parameter("detail" ? "false") { detail => + val queryDetails = Try(detail.toBoolean).getOrElse(false) + val request = AppMasterDataDetailRequest(appId) + queryDetails match { + case true => + onComplete(askAppMaster[AppMasterSummary](master, appId, request)) { + case Success(value) => + value match { + case data: GeneralAppMasterSummary => + complete(write(data)) + case data: StreamAppMasterSummary => + complete(write(data)) + } + case Failure(ex) => + failWith(ex) + } + + case false => + onComplete(askActor[AppMasterData](master, AppMasterDataRequest(appId))) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } + } + } + } + } ~ + pathEnd { + delete { + val writer = (result: ShutdownApplicationResult) => { + val output = if (result.appId.isSuccess) { + Map("status" -> "success", "info" -> null) + } else { + Map("status" -> "fail", "info" -> result.appId.failed.get.toString) + } + write(output) + } + onComplete(askActor[ShutdownApplicationResult](master, ShutdownApplication(appId))) { + case Success(result) => + val output = if (result.appId.isSuccess) { + Map("status" -> "success", "info" -> null) + } else { + Map("status" -> "fail", "info" -> result.appId.failed.get.toString) + } + complete(write(output)) + case Failure(ex) => + failWith(ex) + } + } + } + } + } +} + +object AppMasterService { + case class Status(success: Boolean, reason: String = null) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala new file mode 100644 index 0000000..3846c33 --- /dev/null +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.ExecutionContext + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.headers.CacheDirectives.{`max-age`, `no-cache`} +import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.stream.Materializer + +import org.apache.gearpump.util.{Constants, LogUtil} +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +trait RouteService { + def route: Route +} + +/** + * Wraps the cache behavior, and some common utils. + */ +trait BasicService extends RouteService { + + implicit def system: ActorSystem + + implicit def timeout: akka.util.Timeout = Constants.FUTURE_TIMEOUT + + implicit def ec: ExecutionContext = system.dispatcher + + protected def doRoute(implicit mat: Materializer): Route + + protected def prefix = Slash ~ "api" / s"$REST_VERSION" + + protected val LOG = LogUtil.getLogger(getClass) + + protected def cache = false + private val noCacheHeader = `Cache-Control`(`no-cache`, `max-age`(0L)) + + def route: Route = encodeResponse { + extractMaterializer { implicit mat => + rawPathPrefix(prefix) { + if (cache) { + doRoute(mat) + } else { + respondWithHeader(noCacheHeader) { + doRoute(mat) + } + } + } + } + } +}
