http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/landing/breadcrumbs.js ---------------------------------------------------------------------- diff --git a/services/dashboard/views/landing/breadcrumbs.js b/services/dashboard/views/landing/breadcrumbs.js index 56ed685..52ab8c2 100644 --- a/services/dashboard/views/landing/breadcrumbs.js +++ b/services/dashboard/views/landing/breadcrumbs.js @@ -5,7 +5,7 @@ angular.module('dashboard') - .directive('breadcrumbs', function() { + .directive('breadcrumbs', function () { 'use strict'; return { @@ -13,9 +13,9 @@ angular.module('dashboard') templateUrl: 'views/landing/breadcrumbs.html', replace: true, scope: {}, - controller: ['$scope', function($scope) { + controller: ['$scope', function ($scope) { - $scope.$on('$stateChangeSuccess', function() { + $scope.$on('$stateChangeSuccess', function () { $scope.breadcrumbs = buildBreadcrumbs(); });
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/landing/header.html ---------------------------------------------------------------------- diff --git a/services/dashboard/views/landing/header.html b/services/dashboard/views/landing/header.html index ddec73d..2f2c27a 100644 --- a/services/dashboard/views/landing/header.html +++ b/services/dashboard/views/landing/header.html @@ -35,7 +35,6 @@ </li> </ul> - <ul class="dropdown nav navbar-nav navbar-right"> <li role="placeholder"></li> <li> http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/landing/header.js ---------------------------------------------------------------------- diff --git a/services/dashboard/views/landing/header.js b/services/dashboard/views/landing/header.js index 00f55a4..617dfda 100644 --- a/services/dashboard/views/landing/header.js +++ b/services/dashboard/views/landing/header.js @@ -5,7 +5,7 @@ angular.module('dashboard') - .directive('header', function() { + .directive('header', function () { 'use strict'; return { @@ -13,7 +13,7 @@ angular.module('dashboard') templateUrl: 'views/landing/header.html', replace: true, scope: {}, - controller: ['$scope', '$cookies', 'restapi', 'conf', function($scope, $cookies, restapi, conf) { + controller: ['$scope', '$cookies', 'restapi', 'conf', function ($scope, $cookies, restapi, conf) { $scope.clusterMenuItems = [ {text: 'Master', pathPatt: 'master', icon: 'fa fa-laptop'}, {text: 'Workers', pathPatt: 'workers', icon: 'fa fa-server'} @@ -28,7 +28,7 @@ angular.module('dashboard') ]; $scope.version = 'beta'; - restapi.serviceVersion(function(version) { + restapi.serviceVersion(function (version) { $scope.version = version; }); }] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/service_unreachable_notice.html ---------------------------------------------------------------------- diff --git a/services/dashboard/views/service_unreachable_notice.html b/services/dashboard/views/service_unreachable_notice.html index 916124c..1835403 100644 --- a/services/dashboard/views/service_unreachable_notice.html +++ b/services/dashboard/views/service_unreachable_notice.html @@ -6,7 +6,8 @@ The server is unreachable</h4> </div> <div class="modal-body"> - <h5 style="line-height: 150%; margin-top: 0">Sorry, we couldn't complete the request. Please check your network connectivity or + <h5 style="line-height: 150%; margin-top: 0">Sorry, we couldn't complete the request. Please + check your network connectivity or contact the administrator for assistance.</h5> </div> </div> http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/widgets/metrics_period_switcher.js ---------------------------------------------------------------------- diff --git a/services/dashboard/widgets/metrics_period_switcher.js b/services/dashboard/widgets/metrics_period_switcher.js index b40aa23..de9b62c 100644 --- a/services/dashboard/widgets/metrics_period_switcher.js +++ b/services/dashboard/widgets/metrics_period_switcher.js @@ -5,7 +5,7 @@ angular.module('dashboard') - .directive('metricsPeriodSwitcher', function() { + .directive('metricsPeriodSwitcher', function () { 'use strict'; return { @@ -15,7 +15,7 @@ angular.module('dashboard') pastHours: '=', viewCurrent: '=' }, - link: function(scope) { + link: function (scope) { 'use strict'; scope.options = { @@ -23,7 +23,7 @@ angular.module('dashboard') hist: 'Past %d Hours'.replace('%d', scope.pastHours) }; scope.value = scope.viewCurrent ? 'current' : 'hist'; - scope.$watch('value', function(value) { + scope.$watch('value', function (value) { scope.viewCurrent = String(value) === 'current'; }); } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/widgets/radio_group.js ---------------------------------------------------------------------- diff --git a/services/dashboard/widgets/radio_group.js b/services/dashboard/widgets/radio_group.js index aa5427d..cc46788 100644 --- a/services/dashboard/widgets/radio_group.js +++ b/services/dashboard/widgets/radio_group.js @@ -5,7 +5,7 @@ angular.module('dashboard') - .directive('radioGroup', function() { + .directive('radioGroup', function () { 'use strict'; return { @@ -16,11 +16,11 @@ angular.module('dashboard') ngModel: '=', options: '=' }, - link: function(scope, elem, attrs) { + link: function (scope, elem, attrs) { 'use strict'; scope.buttonWidth = attrs.buttonWidth || '96px'; - scope.toggle = function(value) { + scope.toggle = function (value) { scope.ngModel = value; }; } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala ---------------------------------------------------------------------- diff --git a/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala b/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala index 51e6f0d..3d80c4e 100644 --- a/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala +++ b/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala index aa88d13..eb6531f 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,32 +18,34 @@ package io.gearpump.services -import akka.actor.{ActorSystem} +import akka.actor.ActorSystem import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ -import akka.stream.{Materializer} -import io.gearpump.util.{Constants, Util} +import akka.stream.Materializer + +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ + /** * AdminService is for cluster-wide managements. it is not related with * specific application. * * For example: - * Security management: Add user, remove user. - * Configuration management: Change configurations. - * Machine management: Add worker machines, remove worker machines, and add masters. + * - Security management: Add user, remove user. + * - Configuration management: Change configurations. + * - Machine management: Add worker machines, remove worker machines, and add masters. */ // TODO: Add YARN resource manager capacities to add/remove machines. class AdminService(override val system: ActorSystem) extends BasicService { - override def prefix = Neutral + protected override def prefix = Neutral - override def doRoute(implicit mat: Materializer) = { + protected override def doRoute(implicit mat: Materializer) = { path("terminate") { post { - system.shutdown() + system.terminate() complete(StatusCodes.NotFound) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala index 30ec061..060e780 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,16 @@ package io.gearpump.services -import akka.actor.{ActorSystem, ActorRef} +import scala.util.{Failure, Success, Try} + +import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.model.{FormData, Multipart} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet -import akka.stream.{Materializer, ActorMaterializer} +import akka.stream.Materializer +import upickle.default.{read, write} + import io.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, GeneralAppMasterSummary} import io.gearpump.cluster.ClientToMaster._ import io.gearpump.cluster.ClusterConfig @@ -31,6 +35,8 @@ import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetail import io.gearpump.cluster.MasterToClient._ import io.gearpump.jarstore.JarStoreService import io.gearpump.services.AppMasterService.Status +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ import io.gearpump.streaming.AppMasterToMaster.StallingTasks import io.gearpump.streaming.appmaster.DagManager._ import io.gearpump.streaming.appmaster.StreamAppMasterSummary @@ -38,9 +44,6 @@ import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, import io.gearpump.util.ActorUtil.{askActor, askAppMaster} import io.gearpump.util.FileDirective._ import io.gearpump.util.{Constants, Util} -import upickle.default.{read, write} -import io.gearpump.services.util.UpickleUtil._ -import scala.util.{Failure, Success, Try} /** * Management service for AppMaster @@ -52,40 +55,42 @@ class AppMasterService(val master: ActorRef, private val systemConfig = system.settings.config private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) { appId => - path("dynamicdag") { - parameters(ParamMagnet("args")) { args: String => - def replaceProcessor(dagOperation: DAGOperation): Route = { - onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) { - case Success(value) => - complete(write(value)) - case Failure(ex) => - failWith(ex) + protected override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) { + appId => { + path("dynamicdag") { + parameters(ParamMagnet("args")) { args: String => + def replaceProcessor(dagOperation: DAGOperation): Route = { + onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) { + case Success(value) => + complete(write(value)) + case Failure(ex) => + failWith(ex) + } } - } - val msg = java.net.URLDecoder.decode(args) - val dagOperation = read[DAGOperation](msg) - (post & entity(as[Multipart.FormData])) { _ => - uploadFile { form => - val jar = form.getFile("jar").map(_.file) + val msg = java.net.URLDecoder.decode(args, "UTF-8") + val dagOperation = read[DAGOperation](msg) + (post & entity(as[Multipart.FormData])) { _ => + uploadFile { form => + val jar = form.getFile("jar").map(_.file) - if (jar.nonEmpty) { - dagOperation match { - case replace: ReplaceProcessor => - val description = replace.newProcessorDescription.copy(jar = Util.uploadJar(jar.get, jarStore)) - val dagOperationWithJar = replace.copy(newProcessorDescription = description) - replaceProcessor(dagOperationWithJar) + if (jar.nonEmpty) { + dagOperation match { + case replace: ReplaceProcessor => + val description = replace.newProcessorDescription.copy(jar = + Util.uploadJar(jar.get, jarStore)) + val dagOperationWithJar = replace.copy(newProcessorDescription = description) + replaceProcessor(dagOperationWithJar) + } + } else { + replaceProcessor(dagOperation) } - } else { - replaceProcessor(dagOperation) } + } ~ (post & entity(as[FormData])) { _ => + replaceProcessor(dagOperation) } - } ~ (post & entity(as[FormData])) { _ => - replaceProcessor(dagOperation) - } - } - } ~ + } + } ~ path("stallingtasks") { onComplete(askAppMaster[StallingTasks](master, appId, GetStallingTasks(appId))) { case Success(value) => @@ -124,23 +129,25 @@ class AppMasterService(val master: ActorRef, val executorId = Integer.parseInt(executorIdString) onComplete(askAppMaster[ExecutorConfig](master, appId, QueryExecutorConfig(executorId))) { case Success(value) => - val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") + val config = Option(value.config).map(ClusterConfig.render(_, concise)) + .getOrElse("{}") complete(config) case Failure(ex) => failWith(ex) } } ~ - pathEnd { - get { - val executorId = Integer.parseInt(executorIdString) - onComplete(askAppMaster[ExecutorSummary](master, appId, GetExecutorSummary(executorId))) { + pathEnd { + get { + val executorId = Integer.parseInt(executorIdString) + onComplete(askAppMaster[ExecutorSummary](master, appId, + GetExecutorSummary(executorId))) { case Success(value) => complete(write(value)) case Failure(ex) => failWith(ex) + } } } - } } ~ path("metrics" / RestPath) { path => parameterMap { optionMap => @@ -210,6 +217,7 @@ class AppMasterService(val master: ActorRef, } } } + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala b/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala index 079afc3..75f033b 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,28 +18,31 @@ package io.gearpump.services -import akka.actor.{ActorSystem} +import scala.concurrent.ExecutionContext + +import akka.actor.ActorSystem import akka.http.scaladsl.model.headers.CacheDirectives.{`max-age`, `no-cache`} import akka.http.scaladsl.model.headers.`Cache-Control` import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.{Route} -import akka.stream.{Materializer} -import io.gearpump.util.{LogUtil, Constants} +import akka.http.scaladsl.server.Route +import akka.stream.Materializer -import scala.concurrent.ExecutionContext +import io.gearpump.util.{Constants, LogUtil} +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ trait RouteService { def route: Route } /** - * Wrap the cache behavior, and some common utils. + * Wraps the cache behavior, and some common utils. */ -trait BasicService extends RouteService{ +trait BasicService extends RouteService { implicit def system: ActorSystem - implicit def timeout = Constants.FUTURE_TIMEOUT + implicit def timeout: akka.util.Timeout = Constants.FUTURE_TIMEOUT implicit def ec: ExecutionContext = system.dispatcher @@ -53,7 +56,7 @@ trait BasicService extends RouteService{ private val noCacheHeader = `Cache-Control`(`no-cache`, `max-age`(0L)) def route: Route = encodeResponse { - extractMaterializer {implicit mat => + extractMaterializer { implicit mat => rawPathPrefix(prefix) { if (cache) { doRoute(mat) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index 91f25fe..6ca0f98 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,13 +16,15 @@ * limitations under the License. */ - package io.gearpump.services import java.io.{File, IOException} -import java.nio.file.Files import java.nio.charset.StandardCharsets.UTF_8 -import java.nio.file.StandardOpenOption.{WRITE, APPEND} +import java.nio.file.Files +import java.nio.file.StandardOpenOption.{APPEND, WRITE} +import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.util.{Failure, Success} import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ @@ -30,6 +32,7 @@ import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet import akka.http.scaladsl.unmarshalling.Unmarshaller._ import akka.stream.Materializer import com.typesafe.config.Config + import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption} import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList} @@ -38,18 +41,16 @@ import io.gearpump.cluster.client.ClientContext import io.gearpump.cluster.worker.WorkerSummary import io.gearpump.cluster.{ClusterConfig, UserConfig} import io.gearpump.jarstore.JarStoreService -import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} import io.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription} +import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication} import io.gearpump.util.ActorUtil._ import io.gearpump.util.FileDirective._ import io.gearpump.util.{Constants, Graph, Util} -import io.gearpump.services.util.UpickleUtil._ - -import scala.collection.JavaConversions._ -import scala.concurrent.Future -import scala.util.{Failure, Success} +/** Manages service for master node */ class MasterService(val master: ActorRef, val jarStore: JarStoreService, override val system: ActorSystem) extends BasicService { @@ -59,7 +60,7 @@ class MasterService(val master: ActorRef, private val systemConfig = system.settings.config private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - override def doRoute(implicit mat: Materializer) = pathPrefix("master") { + protected override def doRoute(implicit mat: Materializer) = pathPrefix("master") { pathEnd { get { onComplete(askActor[MasterData](master, GetMasterData)) { @@ -76,16 +77,17 @@ class MasterService(val master: ActorRef, } } ~ path("workerlist") { - def future = askActor[WorkerList](master, GetAllWorkers).flatMap { workerList => - val workers = workerList.workers - val workerDataList = List.empty[WorkerSummary] - - Future.fold(workers.map { workerId => - askWorker[WorkerData](master, workerId, GetWorkerData(workerId)) - })(workerDataList) { (workerDataList, workerData) => - workerDataList :+ workerData.workerDescription + def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, GetAllWorkers) + .flatMap { workerList => + val workers = workerList.workers + val workerDataList = List.empty[WorkerSummary] + + Future.fold(workers.map { workerId => + askWorker[WorkerData](master, workerId, GetWorkerData(workerId)) + })(workerDataList) { (workerDataList, workerData) => + workerDataList :+ workerData.workerDescription + } } - } onComplete(future) { case Success(result: List[WorkerSummary]) => complete(write(result)) case Failure(ex) => failWith(ex) @@ -221,9 +223,11 @@ object MasterService { case class Status(success: Boolean, reason: String = null) /** - * Submit Native Application. + * Submits Native Application. */ - def submitGearApp(jar: Option[File], executorNum: Int, args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = { + def submitGearApp( + jar: Option[File], executorNum: Int, args: String, + systemConfig: Config, userConfigFile: Option[File]): Boolean = { submitAndDeleteTempFiles( "io.gearpump.cluster.main.AppSubmitter", argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args), @@ -235,9 +239,10 @@ object MasterService { } /** - * Submit Storm application. + * Submits Storm application. */ - def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { + def submitStormApp( + jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { submitAndDeleteTempFiles( "io.gearpump.experiments.storm.main.GearpumpStormClient", argsArray = spaceSeparatedArgumentsToArray(args), @@ -248,9 +253,10 @@ object MasterService { ) } - private def submitAndDeleteTempFiles(mainClass: String, argsArray: Array[String], fileMap: Map[String, File], - classPath: Array[String], systemConfig: Config, - userConfigFile: Option[File] = None): Boolean = { + private def submitAndDeleteTempFiles( + mainClass: String, argsArray: Array[String], fileMap: Map[String, File], + classPath: Array[String], systemConfig: Config, + userConfigFile: Option[File] = None): Boolean = { try { val jar = fileMap.get("jar") if (jar.isEmpty) { @@ -279,7 +285,7 @@ object MasterService { } /** - * Return Java options for gearpump cluster + * Returns Java options for gearpump cluster */ private def clusterOptions(systemConfig: Config, userConfigFile: Option[File]): Array[String] = { var options = Array( @@ -288,7 +294,8 @@ object MasterService { s"-D${Constants.PREFER_IPV4}=true" ) - val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).flatMap(Util.parseHostList) + val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + .toList.flatMap(Util.parseHostList) options ++= masters.zipWithIndex.map { case (master, index) => s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}" }.toArray[String] @@ -311,7 +318,7 @@ object MasterService { } /** - * Return a space separated arguments as an array. + * Returns a space separated arguments as an array. */ private def spaceSeparatedArgumentsToArray(str: String): Array[String] = { str.split(" +").filter(_.nonEmpty) @@ -335,9 +342,9 @@ object MasterService { ) } - case class SubmitApplicationRequest ( - appName: String, - processors: Map[ProcessorId, ProcessorDescription], - dag: Graph[Int, String], - userconfig: UserConfig) + case class SubmitApplicationRequest( + appName: String, + processors: Map[ProcessorId, ProcessorDescription], + dag: Graph[Int, String], + userconfig: UserConfig) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala index a44e0a9..d0fea30 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,20 +18,25 @@ package io.gearpump.services +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{Route, _} import akka.stream.ActorMaterializer import akka.util.Timeout -import io.gearpump.jarstore.JarStoreService -import io.gearpump.util.{Constants, LogUtil} import org.apache.commons.lang.exception.ExceptionUtils -import scala.concurrent.Await -import scala.concurrent.duration._ +import io.gearpump.jarstore.JarStoreService +import io.gearpump.util.{Constants, LogUtil} +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ -class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem) extends RouteService { +/** Contains all REST API service endpoints */ +class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem) + extends RouteService { implicit val timeout = Constants.FUTURE_TIMEOUT @@ -42,9 +47,11 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem private val LOG = LogUtil.getLogger(getClass) - private val securityEnabled = config.getBoolean(Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED) + private val securityEnabled = config.getBoolean( + Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED) - private val supervisorPath = system.settings.config.getString(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) + private val supervisorPath = system.settings.config.getString( + Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) private val myExceptionHandler: ExceptionHandler = ExceptionHandler { case ex: Throwable => { @@ -55,7 +62,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem } } - // make sure staticRoute is the final one, as it will try to lookup resource in local path + // Makes sure staticRoute is the final one, as it will try to lookup resource in local path // if there is no match in previous routes private val static = new StaticService(system, supervisorPath).route http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala index a5bbe2f..b1abf62 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,54 +18,57 @@ package io.gearpump.services -import akka.actor.{ActorSystem} -import akka.http.scaladsl.model.{Uri, StatusCodes, RemoteAddress} -import akka.http.scaladsl.model.headers.{HttpCookiePair, HttpCookie, HttpChallenge} -import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsRejected, CredentialsMissing} -import akka.http.scaladsl.server._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair} +import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri} +import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected} import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet import akka.stream.Materializer -import com.softwaremill.session.ClientSessionManagerMagnet._ -import com.softwaremill.session.SessionDirectives._ -import com.softwaremill.session._ import com.typesafe.config.Config +import com.softwaremill.session.SessionDirectives._ +import com.softwaremill.session.SessionOptions._ +import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager} +import upickle.default.write + +import io.gearpump.security.{Authenticator => BaseAuthenticator} import io.gearpump.services.SecurityService.{User, UserSession} import io.gearpump.services.security.oauth2.OAuth2Authenticator import io.gearpump.util.{Constants, LogUtil} -import upickle.default.{write} +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.security.{Authenticator => BaseAuthenticator} -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - /** - * When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection - * When user can be authenticated, but are not authorized to access certail resource, will - * return a 405 AuthorizationFailedRejection. - * - * When web UI frontend receive 401, it should redirect the UI to login page. - * When web UI receive 405,it should display errors like - * "current user is not authorized to access this resource." + * Security authentication endpoint. * - * The Authenticator used is pluggable, the current Authenticator is resolved by looking up config path - * [[Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]]. + * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection + * - When user can be authenticated, but are not authorized to access certail resource, will + * return a 405 AuthorizationFailedRejection. + * - When web UI frontend receive 401, it should redirect the UI to login page. + * - When web UI receive 405,it should display errors like + * "current user is not authorized to access this resource." * - * see [[BaseAuthenticator]] to find more info on custom Authenticator. + * The Authenticator used is pluggable, the current Authenticator is resolved by looking up + * config path [[io.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]]. * + * See [[io.gearpump.security.Authenticator]] to find more info on custom Authenticator. */ class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService { // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box. - private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump", params = Map.empty) + private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump", + params = Map.empty) val LOG = LogUtil.getLogger(getClass, "AUDIT") private val config = system.settings.config private val sessionConfig = SessionConfig.fromConfig(config) - private implicit val sessionManager = new SessionManager[UserSession](sessionConfig) - private val magnet = ClientSessionManagerMagnet.forSessionManager[UserSession, Unit](Unit) + private implicit val sessionManager: SessionManager[UserSession] = + new SessionManager[UserSession](sessionConfig) private val authenticator = { val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS)) @@ -74,7 +77,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext authenticator } - private def configToMap(config : Config, path: String) = { + private def configToMap(config: Config, path: String) = { import scala.collection.JavaConverters._ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString } } @@ -91,8 +94,9 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext } } - private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext): Future[Option[UserSession]] = { - authenticator.authenticate(user, pass, ec).map{ result => + private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext) + : Future[Option[UserSession]] = { + authenticator.authenticate(user, pass, ec).map { result => if (result.authenticated) { Some(UserSession(user, result.permissionLevel)) } else { @@ -110,7 +114,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext } private def requireAuthentication(inner: UserSession => Route): Route = { - optionalSession(magnet) { sessionOption => + optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption => sessionOption match { case Some(session) => { inner(session) @@ -122,10 +126,12 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext } private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = { - setSession(session) { + setSession(oneOff, usingCookies, session) { val user = session.user - val maxAgeMs = 1000 * sessionConfig.clientSessionMaxAgeSeconds.getOrElse(24 * 3600L) // default 1 day - setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"), maxAge = Some(maxAgeMs))) { + // Default: 1 day + val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L) + setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"), + maxAge = Some(maxAgeMs))) { LOG.info(s"user $user login from $ip") if (redirectToRoot) { redirect(Uri("/"), StatusCodes.TemporaryRedirect) @@ -137,7 +143,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext } private def logout(user: UserSession, ip: String): Route = { - invalidateSession(magnet) { ctx => + invalidateSession(oneOff, usingCookies) { ctx => LOG.info(s"user ${user.user} logout from $ip") ctx.complete(write(new User(user.user))) } @@ -145,13 +151,13 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext // Only admin are able to access operation like post/delete/put private def requireAuthorization(user: UserSession, route: => Route): Route = { - // valid user + // Valid user if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) { route } else { - // possibly a guest or not authenticated. + // Possibly a guest or not authenticated. (put | delete | post) { - // reject with 405 authorization error + // Reject with 405 authorization error reject(AuthorizationFailedRejection) } ~ get { @@ -171,8 +177,8 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext extractExecutionContext{implicit ec: ExecutionContext => extractMaterializer{implicit mat: Materializer => (extractClientIP | unknownIp) { ip => - pathPrefix("login") { - pathEndOrSingleSlash { + pathPrefix("login") { + pathEndOrSingleSlash { get { getFromResource("login/login.html") } ~ @@ -181,7 +187,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext formField(FieldMagnet('username.as[String])) {user: String => formFields(FieldMagnet('password.as[String])) {pass: String => val result = authenticate(user, pass) - onSuccess(result){ + onSuccess(result) { case Some(session) => login(session, ip.toString) case None => @@ -192,7 +198,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext } } ~ path ("oauth2" / "providers") { - // respond with a list of OAuth2 providers. + // Responds with a list of OAuth2 providers. complete(write(oauth2Providers)) } ~ // Support OAUTH Authentication @@ -207,7 +213,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = { val result = oauthService.authenticate(parameters) - onComplete(result){ + onComplete(result) { case Success(session) => login(session, ip.toString, redirectToRoot = true) case Failure(ex) => { @@ -218,7 +224,7 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext } path ("authorize") { - // redirect to OAuth2 service provider for authorization. + // Redirects to OAuth2 service provider for authorization. redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect) } ~ path ("accesstoken") { @@ -255,17 +261,28 @@ class SecurityService(inner: RouteService, implicit val system: ActorSystem) ext object SecurityService { - val SESSION_MANAGER_KEY = "akka.http.session.serverSecret" + val SESSION_MANAGER_KEY = "akka.http.session.server-secret" case class UserSession(user: String, permissionLevel: Int) object UserSession { - implicit def serializer: SessionSerializer[UserSession] = new ToMapSessionSerializer[UserSession] { - private val User = "user" - private val PermissionLevel = "permissionLevel" - override def serializeToMap(t: UserSession) = Map(User -> t.user, PermissionLevel->t.permissionLevel.toString) - override def deserializeFromMap(m: Map[String, String]) = UserSession(m(User), m(PermissionLevel).toInt) + private val User = "user" + private val PermissionLevel = "permissionLevel" + + implicit def serializer: MultiValueSessionSerializer[UserSession] = { + new MultiValueSessionSerializer[UserSession]( + toMap = {t: UserSession => + Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString) + }, + fromMap = {m: Map[String, String] => + if (m.contains(User)) { + Try(UserSession(m(User), m(PermissionLevel).toInt)) + } else { + Failure[UserSession](new Exception("Fail to parse session ")) + } + } + ) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala index 09180c3..fc990a1 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,13 @@ package io.gearpump.services -import akka.actor.{ActorSystem} +import akka.actor.ActorSystem import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ -import akka.stream.{Materializer} -import io.gearpump.util.{Constants, Util} +import akka.stream.Materializer + +import io.gearpump.util.Util +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ /** @@ -33,13 +35,13 @@ class StaticService(override val system: ActorSystem, supervisorPath: String) private val version = Util.version - override def prefix = Neutral + protected override def prefix = Neutral - override def cache = true + override def cache: Boolean = true - override def doRoute(implicit mat: Materializer) = { + protected override def doRoute(implicit mat: Materializer) = { path("version") { - get {ctx => + get { ctx => ctx.complete(version) } } ~ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala index 5d552d6..c7f19e4 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,39 +18,43 @@ package io.gearpump.services +import scala.concurrent.Future +import scala.util.{Failure, Success} + import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.stream.Materializer -import io.gearpump.WorkerId -import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, GetAllWorkers} + +import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import io.gearpump.cluster.ClientToMaster._ +import io.gearpump.cluster.worker.WorkerId import io.gearpump.services.SupervisorService.{Path, Status} import io.gearpump.util.ActorUtil._ - -import scala.concurrent.Future -import scala.util.{Failure, Success} -import upickle.default.{read, write} +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ -class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem) +/** Responsible for adding/removing machines. Typically it delegates to YARN. */ +class SupervisorService( + val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem) extends BasicService { import upickle.default.write /** - * TODO: Add additional check to ensure the user have enough authorization to add/remove a worker machine + * TODO: Add additional check to ensure the user have enough authorization to + * add/remove a worker machine */ private def authorize(internal: Route): Route = { if (supervisor == null) { - failWith(new Exception("API not enabled, cannot find a valid supervisor! Please make sure Gearpump is " + - "running on top of YARN or other resource managers")) + failWith(new Exception("API not enabled, cannot find a valid supervisor! " + + "Please make sure Gearpump is running on top of YARN or other resource managers")) } else { internal } } - override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") { + protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") { pathEnd { get { val path = if (supervisor == null) { @@ -105,7 +109,7 @@ class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override } } -object SupervisorService{ +object SupervisorService { case class Status(enabled: Boolean) case class Path(path: String) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala index a1edbe4..78cdb37 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,20 +18,23 @@ package io.gearpump.services -import akka.actor.{ActorSystem, ActorRef} +import scala.util.{Failure, Success} + +import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ -import akka.stream.{Materializer} -import io.gearpump.WorkerId +import akka.stream.Materializer + import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} -import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, QueryWorkerConfig} +import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption} import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.util.ActorUtil._ import io.gearpump.util.Constants +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ -import scala.util.{Failure, Success} - +/** Service to handle worker related queries */ class WorkerService(val master: ActorRef, override val system: ActorSystem) extends BasicService { @@ -39,15 +42,17 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem) private val systemConfig = system.settings.config private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { workerIdString => - pathEnd { - val workerId = WorkerId.parse(workerIdString) - onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) { - case Success(value: WorkerData) => - complete(write(value.workerDescription)) - case Failure(ex) => failWith(ex) + protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { + workerIdString => { + pathEnd { + val workerId = WorkerId.parse(workerIdString) + onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) { + case Success(value: WorkerData) => + complete(write(value.workerDescription)) + case Failure(ex) => failWith(ex) + } } - } ~ + }~ path("config") { val workerId = WorkerId.parse(workerIdString) onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala index 7dc0277..a647a8d 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,21 +18,26 @@ package io.gearpump.services.main -import akka.actor.{ActorSystem} +import java.util.Random +import scala.collection.JavaConverters._ +import scala.concurrent.Await + +import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import com.typesafe.config.ConfigValueFactory +import org.slf4j.Logger +import sun.misc.BASE64Encoder + import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.main.{Gear, CLIOption, ArgumentsParser} +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear} import io.gearpump.cluster.master.MasterProxy -import io.gearpump.services.{SecurityService, RestServices} +import io.gearpump.services.{RestServices, SecurityService} import io.gearpump.util.LogUtil.ProcessType import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} -import org.slf4j.Logger - -import scala.concurrent.Await +/** Command line to start UI server */ object Services extends AkkaApp with ArgumentsParser { private val LOG = LogUtil.getLogger(getClass) @@ -48,11 +53,13 @@ object Services extends AkkaApp with ArgumentsParser { ClusterConfig.ui() } - override def help: Unit = { + override def help(): Unit = { + // scalastyle:off println Console.println("UI Server") + // scalastyle:on println } - private var killFunction: Option[() =>Unit] = None + private var killFunction: Option[() => Unit] = None override def main(inputAkkaConf: Config, args: Array[String]): Unit = { @@ -69,48 +76,59 @@ object Services extends AkkaApp with ArgumentsParser { LogUtil.getLogger(getClass) } - - import scala.collection.JavaConversions._ if (argConfig.exists("master")) { val master = argConfig.getString("master") akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, - ConfigValueFactory.fromIterable(List(master))) + ConfigValueFactory.fromIterable(List(master).asJava)) } akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH, ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor"))) - // Create a random unique secret key for session manager. + // Creates a random unique secret key for session manager. // All previous stored session token cookies will be invalidated when UI // server is restarted. .withValue(SecurityService.SESSION_MANAGER_KEY, - ConfigValueFactory.fromAnyRef(java.util.UUID.randomUUID().toString())) + ConfigValueFactory.fromAnyRef(randomSeverSecret())) - val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList) + val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + .flatMap(Util.parseHostList) - implicit val system = ActorSystem("services" , akkaConf) + implicit val system = ActorSystem("services", akkaConf) implicit val executionContext = system.dispatcher import scala.concurrent.duration._ - val master = system.actorOf(MasterProxy.props(masterCluster, 1 day), s"masterproxy${system.name}") + val master = system.actorOf(MasterProxy.props(masterCluster, 1.day), + s"masterproxy${system.name}") val (host, port) = parseHostPort(system.settings.config) - implicit val mat = ActorMaterializer() val services = new RestServices(master, mat, system) val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port) - Await.result(bindFuture, 15 seconds) + Await.result(bindFuture, 15.seconds) - val displayHost = if(host == "0.0.0.0") "127.0.0.1" else host + val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host LOG.info(s"Please browse to http://$displayHost:$port to see the web UI") + + // scalastyle:off println println(s"Please browse to http://$displayHost:$port to see the web UI") + // scalastyle:on println - killFunction = Some{() => + killFunction = Some { () => LOG.info("Shutting down UI Server") - system.shutdown() + system.terminate() } - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) + } + + private def randomSeverSecret(): String = { + val random = new Random() + val length = 64 // Required + val bytes = new Array[Byte](length) + random.nextBytes(bytes) + val endecoder = new BASE64Encoder() + endecoder.encode(bytes) } private def parseHostPort(config: Config): (String, Int) = { @@ -120,7 +138,8 @@ object Services extends AkkaApp with ArgumentsParser { } // TODO: fix this - // Hack around for YARN module, so that we can kill the UI server when application is shutting down. + // Hacks around for YARN module, so that we can kill the UI server + // when application is shutting down. def kill(): Unit = { if (killFunction.isDefined) { killFunction.get.apply() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/package.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/package.scala b/services/jvm/src/main/scala/io/gearpump/services/package.scala index bebcf95..7b38134 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/package.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/package.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala index edaefa7..158d406 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,13 @@ package io.gearpump.services.security.oauth2 +import scala.concurrent.{ExecutionContext, Future} + import com.typesafe.config.Config + import io.gearpump.services.SecurityService.UserSession import io.gearpump.util.Constants import io.gearpump.util.Constants._ -import scala.concurrent.{ExecutionContext, Future} /** * @@ -30,46 +32,47 @@ import scala.concurrent.{ExecutionContext, Future} * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works. * * Basically flow for OAuth2 Authentication: - * 1. User accesses Gearpump UI website, and choose to login with OAuth2 server. - * 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint. - * 3. End user complete the authorization in the domain of OAuth2 server. - * 4. OAuth2 server redirects user back to Gearpump UI server. - * 5. Gearpump UI server verify the tokens and extract credentials from query - * parameters and form fields. + * 1. User accesses Gearpump UI website, and choose to login with OAuth2 server. + * 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint. + * 3. End user complete the authorization in the domain of OAuth2 server. + * 4. OAuth2 server redirects user back to Gearpump UI server. + * 5. Gearpump UI server verify the tokens and extract credentials from query + * parameters and form fields. * - * @note '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is thread-safe. - * Sub-class should have a parameterless constructor. + * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is + * thread-safe. Sub-class should have a parameterless constructor. * - * @note OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are + * NOTE: OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are * set properly if applied. * - * @example Config proxy when UI server is started on Windows: + * Example: Config proxy when UI server is started on Windows: * {{{ - * > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com -Dhttps.proxyPort=8088 + * > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com + * -Dhttps.proxyPort=8088 * > bin\services * }}} * - * @example Config proxy when UI server is started on Linux: + * Example: Config proxy when UI server is started on Linux: * {{{ - * $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com -Dhttps.proxyPort=8088" + * $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com + * -Dhttps.proxyPort=8088" * $ bin/services * }}} - * */ trait OAuth2Authenticator { /** * Inits authenticator with config which contains client ID, client secret, and etc.. * - * Typically, the client key and client secret is provided by OAuth2 Authorization server when user - * register an application there. - * @see [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id, + * Typically, the client key and client secret is provided by OAuth2 Authorization server + * when user register an application there. + * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id, * and client secret. * * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github * use client key, and client secret. * - * @note '''Thread-Safety''': Framework ensures this call is synchronized. + * NOTE: '''Thread-Safety''': Framework ensures this call is synchronized. * * @param config Client Id, client secret, callback URL and etc.. * @param executionContext ExecutionContext from hosting environment. @@ -80,7 +83,7 @@ trait OAuth2Authenticator { * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2 * authorization. * - * @note '''Thread-Safety''': This can be called in a multi-thread environment. Developer + * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer * need to ensure thread safety. */ def getAuthorizationUrl: String @@ -89,9 +92,11 @@ trait OAuth2Authenticator { * After authorization, OAuth2 server redirects user back with tokens. This verify the * tokens, retrieve the profiles, and return [[UserSession]] information. * - * @note This is an Async call. - * @note This call requires external internet access. - * @note '''Thread-Safety''': This can be called in a multi-thread environment. Developer + * NOTE: This is an Async call. + * + * NOTE: This call requires external internet access. + * + * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer * need to ensure thread safety. * * @param parameters HTTP Query and Post parameters, which typically contains Authorization code. @@ -100,7 +105,7 @@ trait OAuth2Authenticator { def authenticate(parameters: Map[String, String]): Future[UserSession] /** - * Clean resource + * Clean up resource */ def close(): Unit } @@ -116,7 +121,8 @@ object OAuth2Authenticator { * @param provider, Name for the OAuth2 Authentication Service. * @return Returns null if the OAuth2 Authentication is disabled. */ - def get(config: Config, provider: String, executionContext: ExecutionContext): OAuth2Authenticator = { + def get(config: Config, provider: String, executionContext: ExecutionContext) + : OAuth2Authenticator = { if (providers.contains(provider)) { providers(provider) @@ -129,7 +135,8 @@ object OAuth2Authenticator { providers(provider) } else { val authenticatorConfig = config.getConfig(path) - val authenticatorClass = authenticatorConfig.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) + val authenticatorClass = authenticatorConfig.getString( + GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass) val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator] authenticator.init(authenticatorConfig, executionContext) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala index f0ba400..e422d3f 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,14 +19,18 @@ package io.gearpump.services.security.oauth2.impl import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.StringBuilder +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} +import com.typesafe.config.Config import com.github.scribejava.core.builder.ServiceBuilderAsync import com.github.scribejava.core.builder.api.DefaultApi20 import com.github.scribejava.core.model._ import com.github.scribejava.core.oauth.OAuth20Service import com.github.scribejava.core.utils.OAuthEncoder import com.ning.http.client.AsyncHttpClientConfig -import com.typesafe.config.Config + import io.gearpump.security.Authenticator import io.gearpump.services.SecurityService.UserSession import io.gearpump.services.security.oauth2.OAuth2Authenticator @@ -34,14 +38,10 @@ import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi import io.gearpump.util.Constants._ import io.gearpump.util.Util -import scala.collection.mutable.StringBuilder -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} - /** * Uses Ning AsyncClient to connect to OAuth2 service. * - * @see [[OAuth2Authenticator]] for more API information. + * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information. */ abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { @@ -105,7 +105,6 @@ abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { oauthService.getAuthorizationUrl() } - protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = { val promise = Promise[UserSession]() val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService) @@ -139,7 +138,7 @@ abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { new OAuthAsyncRequestCallback[OAuth2AccessToken] { override def onCompleted(accessToken: OAuth2AccessToken): Unit = { - authenticateWithAccessToken(accessToken).onComplete{ + authenticateWithAccessToken(accessToken).onComplete { case Success(user) => promise.success(user) case Failure(ex) => promise.failure(ex) } @@ -167,9 +166,11 @@ abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { } } - private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String): OAuth20Service = { - val state: String = "state" + Util.randInt - ScribeJavaConfig.setForceTypeOfHttpRequests(ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS) + private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String) + : OAuth20Service = { + val state: String = "state" + Util.randInt() + ScribeJavaConfig.setForceTypeOfHttpRequests( + ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS) val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder() .setMaxConnections(5) .setUseProxyProperties(true) @@ -189,7 +190,6 @@ abstract class BaseOAuth2Authenticator extends OAuth2Authenticator { service } - } object BaseOAuth2Authenticator { @@ -200,18 +200,21 @@ object BaseOAuth2Authenticator { } def getAuthorizationUrl(config: OAuthConfig): String = { - val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl, config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback), OAuthEncoder.encode(config.getScope))) + val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl, + config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback), + OAuthEncoder.encode(config.getScope))) val state: String = config.getState if (state != null) { sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state)) } - return sb.toString + sb.toString } override def createService(config: OAuthConfig): OAuth20Service = { - return new OAuth20Service(this, config) { + new OAuth20Service(this, config) { - protected override def createAccessTokenRequest[T <: AbstractRequest](code: String, request: T): T = { + protected override def createAccessTokenRequest[T <: AbstractRequest]( + code: String, request: T): T = { super.createAccessTokenRequest(code, request) if (!getConfig.hasGrantType) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala index 27b66d2..98e37ea 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,44 +18,49 @@ package io.gearpump.services.security.oauth2.impl +import scala.concurrent.{ExecutionContext, Future, Promise} + +import com.typesafe.config.Config import com.github.scribejava.core.builder.api.DefaultApi20 import com.github.scribejava.core.model._ import com.github.scribejava.core.oauth.OAuth20Service import com.ning.http.client import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient} -import com.typesafe.config.Config -import io.gearpump.services.SecurityService.UserSession -import io.gearpump.services.security.oauth2.OAuth2Authenticator -import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20 -import io.gearpump.util.Constants._ import spray.json.{JsString, _} import sun.misc.BASE64Encoder -import scala.concurrent.{ExecutionContext, Promise, Future} +import io.gearpump.services.SecurityService.UserSession +import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20 +import io.gearpump.util.Constants._ /** * * Does authentication with CloudFoundry UAA service. Currently it only * extract the email address of end user. * - * For what is UAA, please see: - * @see [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]] + * For what is UAA, + * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]] * (User Account and Authentication Service) * * Pre-requisite steps to use this Authenticator: * * Step1: Register your website to UAA with tool uaac. - * 1) Check tutorial on uaac at [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]] - * 2) Open a bash shell, set the UAA server by command `uaac target` - * {{{ - * uaac target [your uaa server url] - * }}} + * 1) Check tutorial on uaac at + * [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]] + * + * 2) Open a bash shell, set the UAA server by command `uaac target` + * {{{ + * uaac target [your uaa server url] + * }}} + * * NOTE: [your uaa server url] should match the uaahost settings in gear.conf - * 3) Login in as user admin by - * {{{ - * uaac token client get admin -s MyAdminPassword - * }}} - * 4) Create a new Application (Client) in UAA, + * + * 3) Login in as user admin by + * {{{ + * uaac token client get admin -s MyAdminPassword + * }}} + * + * 4) Create a new Application (Client) in UAA, * {{{ * uaac client add [your_client_id] * --scope "openid cloud_controller.read" @@ -67,28 +72,34 @@ import scala.concurrent.{ExecutionContext, Promise, Future} * }}} * * Step2: Configure the OAuth2 information in gear.conf - * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" + * + * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" * as true. - * 2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" - * 3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section. + * + * 2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" + * + * 3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section. * Please make sure class name, client ID, client Secret, and callback URL are set properly. * - * @note The callback URL here should match what you set on CloudFoundry UAA in step1. + * NOTE: The callback URL here should match what you set on CloudFoundry UAA in step1. * * Step3: Restart the UI service and try the "social login" button for UAA. * - * @note OAuth requires Internet access, @see [[OAuth2Authenticator]] to find tutorials to configure - * Internet proxy. + * NOTE: OAuth requires Internet access, @see + * [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to + * configure Internet proxy. * - * @see [[OAuth2Authenticator]] for more background information of OAuth2. + * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background + * information of OAuth2. */ class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator { - import CloudFoundryUAAOAuth2Authenticator._ + import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._ private var host: String = null - protected override def authorizeUrl: String = s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" + protected override def authorizeUrl: String = + s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" protected override def accessTokenEndpoint: String = s"$host/oauth/token" @@ -104,7 +115,8 @@ class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator { if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) { val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR) - val authenticatorClass = additionalAuthenticatorConfig.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) + val authenticatorClass = additionalAuthenticatorConfig + .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS) val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass) val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator] authenticator.init(additionalAuthenticatorConfig, executionContext) @@ -117,17 +129,17 @@ class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator { email.value } - protected override def oauth2Api(): DefaultApi20 = { new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint) } - protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = { + protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken) + : Future[UserSession] = { implicit val ec: ExecutionContext = executionContext if (additionalAuthenticator.isDefined) { - super.authenticateWithAccessToken(accessToken).flatMap{user => + super.authenticateWithAccessToken(accessToken).flatMap { user => additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user) } } else { @@ -152,9 +164,10 @@ object CloudFoundryUAAOAuth2Authenticator { } override def createService(config: OAuthConfig): OAuth20Service = { - return new OAuth20Service(this, config) { + new OAuth20Service(this, config) { - protected override def createAccessTokenRequest[T <: AbstractRequest](code: String, request: T): T = { + protected override def createAccessTokenRequest[T <: AbstractRequest]( + code: String, request: T): T = { val config: OAuthConfig = getConfig() request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE) @@ -181,8 +194,10 @@ object CloudFoundryUAAOAuth2Authenticator { trait AdditionalAuthenticator { /** - * @param config configurations specifically used for this authenticator. - * @param executionContext execution Context to use to run futures. + * Initialization + * + * @param config Configurations specifically used for this authenticator. + * @param executionContext Execution Context to use to run futures. */ def init(config: Config, executionContext: ExecutionContext): Unit @@ -192,7 +207,9 @@ object CloudFoundryUAAOAuth2Authenticator { * @param user user session returned by previous authenticator * @return an updated UserSession */ - def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession): Future[UserSession] + def authenticate( + asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession) + : Future[UserSession] } val ORGANIZATION_URL = "organization-url" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala index 54e6eef..c2b18fd 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,8 +22,6 @@ import com.github.scribejava.apis.google.GoogleJsonTokenExtractor import com.github.scribejava.core.builder.api.DefaultApi20 import com.github.scribejava.core.extractors.TokenExtractor import com.github.scribejava.core.model._ -import io.gearpump.services.security.oauth2.OAuth2Authenticator -import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator.AsyncGoogleApi20 import spray.json._ /** @@ -34,34 +32,40 @@ import spray.json._ * Pre-requisite steps to use this Authenticator: * * Step1: Register your website as an OAuth2 Application on Google - * 1) Create an application representing your website at [[https://console.developers.google.com]] - * 2) In "API Manager" of your created application, enable API "Google+ API" - * 3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager", + * 1) Create an application representing your website at [[https://console.developers.google.com]] + * + * 2) In "API Manager" of your created application, enable API "Google+ API" + * + * 3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager", * choose "Create credentials", and then select OAuth client ID. Follow the wizard * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional. * * Step2: Configure the OAuth2 information in gear.conf - * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" + * + * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled" * as true. - * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure + * + * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure * class name, client ID, client Secret, and callback URL are set properly. * - * @note callback URL set here should match what is configured on Google in step1. + * NOTE: callback URL set here should match what is configured on Google in step1. * * Step3: Restart the UI service and try out the Google social login button in UI. * - * @note OAuth requires Internet access, @see [[OAuth2Authenticator]] to find some helpful tutorials. + * NOTE: OAuth requires Internet access, @see + * [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find some helpful tutorials * - * @note Google use scope to define what data can be fetched by OAuth2. Currently we use profile - * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile in future. + * NOTE: Google use scope to define what data can be fetched by OAuth2. Currently we use profile + * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile + * in future. * - * @todo Currently, this doesn't verify the state from Google OAuth2 response. + * TODO Currently, this doesn't verify the state from Google OAuth2 response. * - * @see [[OAuth2Authenticator]] for more API information. + * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information. */ class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator { - import GoogleOAuth2Authenticator._ + import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._ protected override def authorizeUrl: String = AuthorizeUrl @@ -83,15 +87,17 @@ class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator { object GoogleOAuth2Authenticator { - import BaseOAuth2Authenticator._ + import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._ + // scalastyle:off line.size.limit val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s" + // scalastyle:on line.size.limit val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token" val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me" val Scope = "https://www.googleapis.com/auth/userinfo.email" private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String) - extends BaseApi20(authorizeUrl, accessEndpoint) { + extends BaseApi20(authorizeUrl, accessEndpoint) { override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = { GoogleJsonTokenExtractor.instance http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala index 0ae4505..f95474e 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +18,23 @@ package io.gearpump.services.util -import io.gearpump.WorkerId -import io.gearpump.util.Graph import upickle.Js +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.util.Graph + object UpickleUtil { - //TODO: upickle cannot infer the reader automatically due to - // issue https://github.com/lihaoyi/upickle-pprint/issues/102 - implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = upickle.default.Reader[Graph[Int, String]] { - case Js.Obj(verties, edges) => - val vertexList = upickle.default.readJs[List[Int]](verties._2) - val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) - Graph(vertexList, edgeList) + // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the + // reader type automatically. + // See issue https://github.com/lihaoyi/upickle-pprint/issues/102 + implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = { + upickle.default.Reader[Graph[Int, String]] { + case Js.Obj(verties, edges) => + val vertexList = upickle.default.readJs[List[Int]](verties._2) + val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) + Graph(vertexList, edgeList) + } } implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
