http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala deleted file mode 100644 index 59da80d..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.duration._ -import scala.util.{Success, Try} - -import akka.actor.ActorRef -import akka.http.scaladsl.model.headers.`Cache-Control` -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import akka.testkit.TestActor.{AutoPilot, KeepRunning} -import akka.testkit.{TestKit, TestProbe} -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import org.slf4j.Logger -import upickle.default.read - -import io.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary -import io.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId} -import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} -import io.gearpump.cluster.MasterToClient._ -import io.gearpump.cluster.TestUtil -import io.gearpump.jarstore.JarStoreService -import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} -import io.gearpump.util.LogUtil -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest - with Matchers with BeforeAndAfterAll { - - override def testConfig: Config = TestUtil.UI_CONFIG - - private val LOG: Logger = LogUtil.getLogger(getClass) - private def actorRefFactory = system - - val mockAppMaster = TestProbe() - val failure = LastFailure(System.currentTimeMillis(), "Some error") - - lazy val jarStoreService = JarStoreService.get(system.settings.config) - - def jarStore: JarStoreService = jarStoreService - - private def master = mockMaster.ref - - private def appMasterRoute = new AppMasterService(master, jarStore, system).route - - mockAppMaster.setAutoPilot { - new AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = msg match { - case AppMasterDataDetailRequest(appId) => - sender ! GeneralAppMasterSummary(appId) - KeepRunning - case QueryHistoryMetrics(path, _, _, _) => - sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) - KeepRunning - case GetLastFailure(appId) => - sender ! failure - KeepRunning - case GetExecutorSummary(0) => - sender ! ExecutorSummary.empty - KeepRunning - case QueryExecutorConfig(0) => - sender ! ExecutorConfig(system.settings.config) - KeepRunning - } - } - } - - val mockMaster = TestProbe() - mockMaster.setAutoPilot { - new AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = msg match { - case ResolveAppId(0) => - sender ! ResolveAppIdResult(Success(mockAppMaster.ref)) - KeepRunning - case AppMasterDataRequest(appId, _) => - sender ! AppMasterData("active") - KeepRunning - case QueryAppMasterConfig(appId) => - sender ! AppMasterConfig(null) - KeepRunning - } - } - } - - "AppMasterService" should "return a JSON structure for GET request when detail = false" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check { - val responseBody = responseAs[String] - read[AppMasterData](responseBody) - - // Checks the header, should contains no-cache header. - // Cache-Control:no-cache, max-age=0 - val noCache = header[`Cache-Control`].get.value() - assert(noCache == "no-cache, max-age=0") - } - - Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check { - val responseBody = responseAs[String] - } - } - - "MetricsQueryService" should "return history metrics" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - } - - "AppMaster" should "return lastest error" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check { - val responseBody = responseAs[String] - assert(read[LastFailure](responseBody) == failure) - } - } - - "ConfigQueryService" should "return config for application" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - } - - it should "return config for executor " in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - } - - it should "return return executor summary" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check { - val responseBody = responseAs[String] - val executorSummary = read[ExecutorSummary](responseBody) - assert(executorSummary.id == 0) - } - } - - override def afterAll { - TestKit.shutdownActorSystem(system) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala deleted file mode 100644 index a24db18..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import java.io.File -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Success, Try} - -import akka.actor.ActorRef -import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.`Cache-Control` -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import akka.stream.scaladsl.{FileIO, Source} -import akka.testkit.TestActor.{AutoPilot, KeepRunning} -import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} -import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication} -import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList} -import io.gearpump.cluster.MasterToClient._ -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import io.gearpump.jarstore.JarStoreService -import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.streaming.ProcessorDescription -import io.gearpump.util.Graph - -class MasterServiceSpec extends FlatSpec with ScalatestRouteTest - with Matchers with BeforeAndAfterAll { - import upickle.default.{read, write} - - override def testConfig: Config = TestUtil.UI_CONFIG - - private def actorRefFactory = system - val workerId = 0 - val mockWorker = TestProbe() - - lazy val jarStoreService = JarStoreService.get(system.settings.config) - - private def master = mockMaster.ref - - def jarStore: JarStoreService = jarStoreService - - private def masterRoute = new MasterService(master, jarStore, system).route - - mockWorker.setAutoPilot { - new AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = msg match { - case GetWorkerData(workerId) => - sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId)) - KeepRunning - case QueryHistoryMetrics(path, _, _, _) => - sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) - KeepRunning - } - } - } - - val mockMaster = TestProbe() - mockMaster.setAutoPilot { - new AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = msg match { - case GetMasterData => - sender ! MasterData(null) - KeepRunning - case AppMastersDataRequest => - sender ! AppMastersData(List.empty[AppMasterData]) - KeepRunning - case GetAllWorkers => - sender ! WorkerList(List(WorkerId(0, 0L))) - KeepRunning - case ResolveWorkerId(WorkerId(0, 0L)) => - sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) - KeepRunning - case QueryHistoryMetrics(path, _, _, _) => - sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) - KeepRunning - case QueryMasterConfig => - sender ! MasterConfig(null) - KeepRunning - case submit: SubmitApplication => - sender ! SubmitApplicationResult(Success(0)) - KeepRunning - } - } - } - - it should "return master info when asked" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check { - // Checks the type - val content = responseAs[String] - read[MasterData](content) - - // Checks the header, should contains no-cache header. - // Cache-Control:no-cache, max-age=0 - val noCache = header[`Cache-Control`].get.value() - assert(noCache == "no-cache, max-age=0") - } - - mockMaster.expectMsg(GetMasterData) - } - - it should "return a json structure of appMastersData for GET request" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check { - // Checks the type - read[AppMastersData](responseAs[String]) - } - mockMaster.expectMsg(AppMastersDataRequest) - } - - it should "return a json structure of worker data for GET request" in { - implicit val customTimeout = RouteTestTimeout(25.seconds) - Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check { - // Checks the type - val workerListJson = responseAs[String] - val workers = read[List[WorkerSummary]](workerListJson) - assert(workers.size > 0) - workers.foreach { worker => - worker.state shouldBe "active" - } - } - mockMaster.expectMsg(GetAllWorkers) - mockMaster.expectMsgType[ResolveWorkerId] - mockWorker.expectMsgType[GetWorkerData] - } - - it should "return config for master" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - mockMaster.expectMsg(QueryMasterConfig) - } - - "submit invalid application" should "return an error" in { - implicit val routeTestTimeout = RouteTestTimeout(30.second) - val tempfile = new File("foo") - val request = entity(tempfile) - - Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check { - assert(response.status.intValue == 500) - } - } - - private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { - val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), - FileIO.fromFile(file, chunkSize = 100000)) - - val body = Source.single( - Multipart.FormData.BodyPart( - "file", - entity, - Map("filename" -> file.getName))) - val form = Multipart.FormData(body) - - Marshal(form).to[RequestEntity] - } - - "MetricsQueryService" should "return history metrics" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - } - - "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in { - import io.gearpump.util.Graph._ - val processors = Map( - 0 -> ProcessorDescription(0, "A", parallelism = 1), - 1 -> ProcessorDescription(1, "B", parallelism = 1) - ) - val dag = Graph(0 ~ "partitioner" ~> 1) - val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null)) - Post(s"/api/$REST_VERSION/master/submitdag", - HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check { - val responseBody = responseAs[String] - val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody) - assert(submitApplicationResultValue.appId >= 0, "invalid appid") - } - } - - "MasterService" should "return Gearpump built-in partitioner list" in { - (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check { - val responseBody = responseAs[String] - val partitioners = read[BuiltinPartitioners](responseBody) - assert(partitioners.partitioners.length > 0, "invalid response") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala deleted file mode 100644 index 3cf99e9..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.FormData -import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.{AuthorizationFailedRejection, _} -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -class SecurityServiceSpec - extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { - - override def testConfig: Config = TestUtil.UI_CONFIG - - implicit def actorSystem: ActorSystem = system - - it should "return 401 if not authenticated" in { - val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) - - implicit val customTimeout = RouteTestTimeout(15.seconds) - - (Get(s"/resource") ~> security.route) ~> check { - assert(rejection.isInstanceOf[AuthenticationFailedRejection]) - } - } - - "guest" should "get protected resource after authentication" in { - val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) - - implicit val customTimeout = RouteTestTimeout(15.seconds) - - var cookie: HttpCookiePair = null - (Post(s"/login", FormData("username" -> "guest", "password" -> "guest")) - ~> security.route) ~> check { - assert("{\"user\":\"guest\"}" == responseAs[String]) - assert(status.intValue() == 200) - assert(header[`Set-Cookie`].isDefined) - val httpCookie = header[`Set-Cookie`].get.cookie - assert(httpCookie.name == "gearpump_token") - cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value) - } - - // After authentication, everything is fine. - Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { - responseAs[String] shouldEqual "OK" - } - - // However, guest cannot access high-permission operations, like POST. - Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { - assert(rejection == AuthorizationFailedRejection) - } - - // Logout, should clear the session - Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check { - assert("{\"user\":\"guest\"}" == responseAs[String]) - assert(status.intValue() == 200) - assert(header[`Set-Cookie`].isDefined) - val httpCookie = header[`Set-Cookie`].get.cookie - assert(httpCookie.name == "gearpump_token") - assert(httpCookie.value == "deleted") - } - - // Access again, rejected this time. - Get("/resource") ~> security.route ~> check { - assert(rejection.isInstanceOf[AuthenticationFailedRejection]) - } - - Post("/resource") ~> security.route ~> check { - assert(rejection.isInstanceOf[AuthenticationFailedRejection]) - } - } - - "admin" should "get protected resource after authentication" in { - val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) - - implicit val customTimeout = RouteTestTimeout(15.seconds) - - var cookie: HttpCookiePair = null - (Post(s"/login", FormData("username" -> "admin", "password" -> "admin")) - ~> security.route) ~> check { - assert("{\"user\":\"admin\"}" == responseAs[String]) - assert(status.intValue() == 200) - assert(header[`Set-Cookie`].isDefined) - val httpCookie = header[`Set-Cookie`].get.cookie - assert(httpCookie.name == "gearpump_token") - cookie = HttpCookiePair(httpCookie.name, httpCookie.value) - } - - // After authentication, everything is fine. - Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { - responseAs[String] shouldEqual "OK" - } - - // Not like guest, admimn can also access POST - Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { - responseAs[String] shouldEqual "OK" - } - - // Logout, should clear the session - Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check { - assert("{\"user\":\"admin\"}" == responseAs[String]) - assert(status.intValue() == 200) - assert(header[`Set-Cookie`].isDefined) - val httpCookie = header[`Set-Cookie`].get.cookie - assert(httpCookie.name == "gearpump_token") - assert(httpCookie.value == "deleted") - } - - // Access again, rejected this time. - Get("/resource") ~> security.route ~> check { - assert(rejection.isInstanceOf[AuthenticationFailedRejection]) - } - - Post("/resource") ~> security.route ~> check { - assert(rejection.isInstanceOf[AuthenticationFailedRejection]) - } - } -} - -object SecurityServiceSpec { - - val resource = new RouteService { - override def route: Route = { - get { - path("resource") { - complete("OK") - } - } ~ - post { - path("resource") { - complete("OK") - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala deleted file mode 100644 index f61e2f5..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.duration._ -import scala.util.Try - -import akka.http.scaladsl.model.headers.`Cache-Control` -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.util.Constants -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -class StaticServiceSpec - extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { - - override def testConfig: Config = TestUtil.UI_CONFIG - private val supervisorPath = system.settings.config.getString( - Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) - - protected def route = new StaticService(system, supervisorPath).route - - it should "return version" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/version") ~> route) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(responseBody == "Unknown-Version") - - // By default, it will be cached. - assert(header[`Cache-Control`].isEmpty) - } - } - - it should "get correct supervisor path" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/supervisor-actor-path") ~> route) ~> check { - val responseBody = responseAs[String] - val defaultSupervisorPath = "" - assert(responseBody == defaultSupervisorPath) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala deleted file mode 100644 index 2676a16..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services - -import scala.concurrent.duration._ -import scala.util.{Success, Try} - -import akka.actor.ActorRef -import akka.http.scaladsl.model.headers.`Cache-Control` -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import akka.testkit.TestActor.{AutoPilot, KeepRunning} -import akka.testkit.{TestKit, TestProbe} -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} -import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId} -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig} -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import io.gearpump.jarstore.JarStoreService -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ - -class WorkerServiceSpec - extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { - - override def testConfig: Config = TestUtil.DEFAULT_CONFIG - - protected def actorRefFactory = system - - val mockWorker = TestProbe() - - protected def master = mockMaster.ref - - lazy val jarStoreService = JarStoreService.get(system.settings.config) - - protected def workerRoute = new WorkerService(master, system).route - - mockWorker.setAutoPilot { - new AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = msg match { - case GetWorkerData(workerId) => - sender ! WorkerData(WorkerSummary.empty) - KeepRunning - case QueryWorkerConfig(workerId) => - sender ! WorkerConfig(null) - KeepRunning - case QueryHistoryMetrics(path, _, _, _) => - sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) - KeepRunning - } - } - } - - val mockMaster = TestProbe() - mockMaster.setAutoPilot { - new AutoPilot { - def run(sender: ActorRef, msg: Any): AutoPilot = msg match { - case ResolveWorkerId(workerId) => - sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) - KeepRunning - } - } - } - - "ConfigQueryService" should "return config for worker" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") - ~> workerRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - } - - it should "return WorkerData" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") - ~> workerRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - - // Check the header, should contains no-cache header. - // Cache-Control:no-cache, max-age=0 - val noCache = header[`Cache-Control`].get.value() - assert(noCache == "no-cache, max-age=0") - } - } - - "MetricsQueryService" should "return history metrics" in { - implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") - ~> workerRoute) ~> check { - val responseBody = responseAs[String] - val config = Try(ConfigFactory.parseString(responseBody)) - assert(config.isSuccess) - } - } - - override def afterAll { - TestKit.shutdownActorSystem(system) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala deleted file mode 100644 index 136026a..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2 - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.HttpEntity.Strict -import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.Uri.{Path, Query} -import akka.http.scaladsl.model._ -import akka.http.scaladsl.testkit.ScalatestRouteTest -import com.typesafe.config.ConfigFactory -import org.scalatest.FlatSpec - -import io.gearpump.security.Authenticator -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator - -class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { - - implicit val actorSystem: ActorSystem = system - private val server = new MockOAuth2Server(system, null) - server.start() - private val serverHost = s"http://127.0.0.1:${server.port}" - - val configMap = Map( - "class" -> "io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator", - "callback" -> s"$serverHost/login/oauth2/cloudfoundryuaa/callback", - "clientid" -> "gearpump_test2", - "clientsecret" -> "gearpump_test2", - "default-userrole" -> "user", - "icon" -> "/icons/uaa.png", - "uaahost" -> serverHost, - "additional-authenticator-enabled" -> "false") - - val configString = ConfigFactory.parseMap(configMap.asJava) - - lazy val uaa = { - val uaa = new CloudFoundryUAAOAuth2Authenticator - uaa.init(configString, system.dispatcher) - uaa - } - - it should "generate the correct authorization request" in { - val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap - assert(parameters("response_type") == "code") - assert(parameters("client_id") == configMap("clientid")) - assert(parameters("redirect_uri") == configMap("callback")) - assert(parameters("scope") == "openid,cloud_controller.read") - } - - it should "authenticate the authorization code and return the correct profile" in { - val code = Map("code" -> "QGGVeA") - val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c" - val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm" - val mail = "[email protected]" - - def accessTokenEndpoint(request: HttpRequest): HttpResponse = { - assert(request.getHeader("Authorization").get.value() == - "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=") - assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") - - val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") - val form = Uri./.withQuery(Query(body)).query().toMap - - assert(form("grant_type") == "authorization_code") - assert(form("code") == "QGGVeA") - assert(form("response_type") == "token") - assert(form("redirect_uri") == configMap("callback")) - - val response = - s""" - |{ - | "access_token": "$accessToken", - | "token_type": "bearer", - | "refresh_token": "$refreshToken", - | "expires_in": 43199, - | "scope": "openid", - | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9" - |} - """.stripMargin - HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) - } - - def protectedResourceEndpoint(request: HttpRequest): HttpResponse = { - assert(request.getUri().query().get("access_token").get == accessToken) - val response = - s""" - |{ - | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c", - | "user_name": "user", - | "email": "$mail" - |} - """.stripMargin - HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) - } - - server.requestHandler = (request: HttpRequest) => { - if (request.uri.path.startsWith(Path("/oauth/token"))) { - accessTokenEndpoint(request) - } else if (request.uri.path.startsWith(Path("/userinfo"))) { - protectedResourceEndpoint(request) - } else { - fail("Unexpected access to " + request.uri.toString()) - } - } - - val userFuture = uaa.authenticate(code) - val user = Await.result(userFuture, 30.seconds) - assert(user.user == mail) - assert(user.permissionLevel == Authenticator.User.permissionLevel) - } - - override def cleanUp(): Unit = { - server.stop() - uaa.close() - super.cleanUp() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala deleted file mode 100644 index 70d8bb0..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2 - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.http.scaladsl.model.HttpEntity.Strict -import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.Uri.{Path, Query} -import akka.http.scaladsl.model._ -import akka.http.scaladsl.testkit.ScalatestRouteTest -import com.typesafe.config.ConfigFactory -import org.scalatest.FlatSpec - -import io.gearpump.security.Authenticator -// NOTE: This cannot be removed!!! -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator -import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator - -class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { - - implicit val actorSystem: ActorSystem = system - private val server = new MockOAuth2Server(system, null) - server.start() - private val serverHost = s"http://127.0.0.1:${server.port}" - - val configMap = Map( - "class" -> "io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator", - "callback" -> s"$serverHost/login/oauth2/google/callback", - "clientid" -> "170234147043-a1tag68jtq6ab4bi11jvsj7vbaqcmhkt.apps.googleusercontent.com", - "clientsecret" -> "ioeWLLDipz2S7aTDXym2-obe", - "default-userrole" -> "guest", - "icon" -> "/icons/google.png") - - val configString = ConfigFactory.parseMap(configMap.asJava) - - private lazy val google = { - val google = new MockGoogleAuthenticator(serverHost) - google.init(configString, system.dispatcher) - google - } - - it should "generate the correct authorization request" in { - val parameters = Uri(google.getAuthorizationUrl()).query().toMap - assert(parameters("response_type") == "code") - assert(parameters("client_id") == configMap("clientid")) - assert(parameters("redirect_uri") == configMap("callback")) - assert(parameters("scope") == GoogleOAuth2Authenticator.Scope) - } - - it should "authenticate the authorization code and return the correct profile" in { - val code = Map("code" -> "4/PME0pfxjiBA42SukR-OTGl7fpFzTWzvZPf1TbkpXL4M#") - val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c" - val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm" - val mail = "[email protected]" - - def accessTokenEndpoint(request: HttpRequest): HttpResponse = { - - assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") - - val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") - val form = Uri./.withQuery(Query(body)).query().toMap - - assert(form("client_id") == configMap("clientid")) - assert(form("client_secret") == configMap("clientsecret")) - assert(form("grant_type") == "authorization_code") - assert(form("code") == code("code")) - assert(form("redirect_uri") == configMap("callback")) - assert(form("scope") == GoogleOAuth2Authenticator.Scope) - - // scalastyle:off line.size.limit - val response = - s""" - |{ - | "access_token": "$accessToken", - | "token_type": "Bearer", - | "expires_in": 3591, - | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3" - |} - """.stripMargin - // scalastyle:on line.size.limit - - HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) - } - - def protectedResourceEndpoint(request: HttpRequest): HttpResponse = { - assert(request.getUri().query().get("access_token").get == accessToken) - val response = - s""" - |{ - | "kind": "plus#person", - | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8", - | "nickname": "gearpump", - | "gender": "female", - | "emails": [ - | { - | "value": "$mail", - | "type": "account" - | } - | ] - | } - """.stripMargin - HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) - } - - server.requestHandler = (request: HttpRequest) => { - if (request.uri.path.startsWith(Path("/oauth2/v4/token"))) { - accessTokenEndpoint(request) - } else if (request.uri.path.startsWith(Path("/plus/v1/people/me"))) { - protectedResourceEndpoint(request) - } else { - fail("Unexpected access to " + request.uri.toString()) - } - } - - val userFuture = google.authenticate(code) - val user = Await.result(userFuture, 30.seconds) - assert(user.user == mail) - assert(user.permissionLevel == Authenticator.Guest.permissionLevel) - } - - override def cleanUp(): Unit = { - server.stop() - google.close() - super.cleanUp() - } -} - -object GoogleOAuth2AuthenticatorSpec { - class MockGoogleAuthenticator(host: String) extends GoogleOAuth2Authenticator { - protected override def authorizeUrl: String = { - super.authorizeUrl.replace("https://accounts.google.com", host) - } - - protected override def accessTokenEndpoint: String = { - super.accessTokenEndpoint.replace("https://www.googleapis.com", host) - } - - protected override def protectedResourceUrl: String = { - super.protectedResourceUrl.replace("https://www.googleapis.com", host) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala deleted file mode 100644 index c03532d..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.security.oauth2 - -import scala.concurrent.{Await, Future} - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.Http.ServerBinding -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Sink - -import io.gearpump.util.Util -// NOTE: This cannot be removed!! -import io.gearpump.services.util.UpickleUtil._ - -/** - * Serves as a fake OAuth2 server. - */ -class MockOAuth2Server( - actorSystem: ActorSystem, - var requestHandler: HttpRequest => HttpResponse) { - - implicit val system: ActorSystem = actorSystem - implicit val materializer = ActorMaterializer() - implicit val ec = system.dispatcher - - private var _port: Int = 0 - private var bindingFuture: Future[ServerBinding] = null - - def port: Int = _port - - def start(): Unit = { - _port = Util.findFreePort().get - - val serverSource = Http().bind(interface = "127.0.0.1", port = _port) - bindingFuture = { - serverSource.to(Sink.foreach { connection => - connection handleWithSyncHandler requestHandler - }).run() - } - } - - def stop(): Unit = { - import scala.concurrent.duration._ - Await.result(bindingFuture.map(_.unbind()), 120.seconds) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala deleted file mode 100644 index b074813..0000000 --- a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.services.util - -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import upickle.default.{read, write} - -import io.gearpump.cluster.UserConfig -import io.gearpump.metrics.Metrics.{Counter, MetricType} -import io.gearpump.services.util.UpickleUtil._ -import io.gearpump.streaming.ProcessorId -import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} -import io.gearpump.util.Graph - -class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - "UserConfig" should "serialize and deserialize with upickle correctly" in { - val conf = UserConfig.empty.withString("key", "value") - val serialized = write(conf) - val deserialized = read[UserConfig](serialized) - assert(deserialized.getString("key") == Some("value")) - } - - "Graph" should "be able to serialize/deserialize correctly" in { - val graph = new Graph[Int, String](List(0, 1), List((0, "edge", 1))) - val serialized = write(graph) - - val deserialized = read[Graph[Int, String]](serialized) - - graph.vertices.toSet shouldBe deserialized.vertices.toSet - graph.edges.toSet shouldBe deserialized.edges.toSet - } - - "MetricType" should "be able to serialize/deserialize correctly" in { - val metric: MetricType = Counter("counter", 100L) - val serialized = write(metric) - val deserialized = read[MetricType](serialized) - metric shouldBe deserialized - } - - "StreamingAppMasterDataDetail" should "serialize and deserialize with upickle correctly" in { - val app = new StreamAppMasterSummary(appId = 0, - processors = Map.empty[ProcessorId, ProcessorSummary], - processorLevels = Map.empty[ProcessorId, Int] - ) - - val serialized = write(app) - val deserialized = read[StreamAppMasterSummary](serialized) - assert(deserialized == app) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala new file mode 100644 index 0000000..b7f294c --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil + +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +class AdminServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { + + override def testConfig: Config = TestUtil.DEFAULT_CONFIG + + implicit def actorSystem: ActorSystem = system + + it should "shutdown the ActorSystem when receiving terminate" in { + val route = new AdminService(actorSystem).route + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Post(s"/terminate") ~> route) ~> check { + assert(status.intValue() == 404) + } + + Await.result(actorSystem.whenTerminated, 20.seconds) + + // terminate should terminate current actor system + assert(actorSystem.whenTerminated.isCompleted) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala new file mode 100644 index 0000000..2ece554 --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.duration._ +import scala.util.{Success, Try} + +import akka.actor.ActorRef +import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.testkit.TestActor.{AutoPilot, KeepRunning} +import akka.testkit.{TestKit, TestProbe} +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import org.slf4j.Logger +import upickle.default.read + +import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary +import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} +import org.apache.gearpump.cluster.MasterToClient._ +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} +import org.apache.gearpump.util.LogUtil +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest + with Matchers with BeforeAndAfterAll { + + override def testConfig: Config = TestUtil.UI_CONFIG + + private val LOG: Logger = LogUtil.getLogger(getClass) + private def actorRefFactory = system + + val mockAppMaster = TestProbe() + val failure = LastFailure(System.currentTimeMillis(), "Some error") + + lazy val jarStoreService = JarStoreService.get(system.settings.config) + + def jarStore: JarStoreService = jarStoreService + + private def master = mockMaster.ref + + private def appMasterRoute = new AppMasterService(master, jarStore, system).route + + mockAppMaster.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case AppMasterDataDetailRequest(appId) => + sender ! GeneralAppMasterSummary(appId) + KeepRunning + case QueryHistoryMetrics(path, _, _, _) => + sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) + KeepRunning + case GetLastFailure(appId) => + sender ! failure + KeepRunning + case GetExecutorSummary(0) => + sender ! ExecutorSummary.empty + KeepRunning + case QueryExecutorConfig(0) => + sender ! ExecutorConfig(system.settings.config) + KeepRunning + } + } + } + + val mockMaster = TestProbe() + mockMaster.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case ResolveAppId(0) => + sender ! ResolveAppIdResult(Success(mockAppMaster.ref)) + KeepRunning + case AppMasterDataRequest(appId, _) => + sender ! AppMasterData("active") + KeepRunning + case QueryAppMasterConfig(appId) => + sender ! AppMasterConfig(null) + KeepRunning + } + } + } + + "AppMasterService" should "return a JSON structure for GET request when detail = false" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check { + val responseBody = responseAs[String] + read[AppMasterData](responseBody) + + // Checks the header, should contains no-cache header. + // Cache-Control:no-cache, max-age=0 + val noCache = header[`Cache-Control`].get.value() + assert(noCache == "no-cache, max-age=0") + } + + Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check { + val responseBody = responseAs[String] + } + } + + "MetricsQueryService" should "return history metrics" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + } + + "AppMaster" should "return lastest error" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check { + val responseBody = responseAs[String] + assert(read[LastFailure](responseBody) == failure) + } + } + + "ConfigQueryService" should "return config for application" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + } + + it should "return config for executor " in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + } + + it should "return return executor summary" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check { + val responseBody = responseAs[String] + val executorSummary = read[ExecutorSummary](responseBody) + assert(executorSummary.id == 0) + } + } + + override def afterAll { + TestKit.shutdownActorSystem(system) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala new file mode 100644 index 0000000..e365e9f --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import java.io.File +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Success, Try} + +import akka.actor.ActorRef +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.stream.scaladsl.{FileIO, Source} +import akka.testkit.TestActor.{AutoPilot, KeepRunning} +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList} +import org.apache.gearpump.cluster.MasterToClient._ +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.streaming.ProcessorDescription +import org.apache.gearpump.util.Graph + +class MasterServiceSpec extends FlatSpec with ScalatestRouteTest + with Matchers with BeforeAndAfterAll { + import upickle.default.{read, write} + + override def testConfig: Config = TestUtil.UI_CONFIG + + private def actorRefFactory = system + val workerId = 0 + val mockWorker = TestProbe() + + lazy val jarStoreService = JarStoreService.get(system.settings.config) + + private def master = mockMaster.ref + + def jarStore: JarStoreService = jarStoreService + + private def masterRoute = new MasterService(master, jarStore, system).route + + mockWorker.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case GetWorkerData(workerId) => + sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId)) + KeepRunning + case QueryHistoryMetrics(path, _, _, _) => + sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) + KeepRunning + } + } + } + + val mockMaster = TestProbe() + mockMaster.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case GetMasterData => + sender ! MasterData(null) + KeepRunning + case AppMastersDataRequest => + sender ! AppMastersData(List.empty[AppMasterData]) + KeepRunning + case GetAllWorkers => + sender ! WorkerList(List(WorkerId(0, 0L))) + KeepRunning + case ResolveWorkerId(WorkerId(0, 0L)) => + sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) + KeepRunning + case QueryHistoryMetrics(path, _, _, _) => + sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) + KeepRunning + case QueryMasterConfig => + sender ! MasterConfig(null) + KeepRunning + case submit: SubmitApplication => + sender ! SubmitApplicationResult(Success(0)) + KeepRunning + } + } + } + + it should "return master info when asked" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check { + // Checks the type + val content = responseAs[String] + read[MasterData](content) + + // Checks the header, should contains no-cache header. + // Cache-Control:no-cache, max-age=0 + val noCache = header[`Cache-Control`].get.value() + assert(noCache == "no-cache, max-age=0") + } + + mockMaster.expectMsg(GetMasterData) + } + + it should "return a json structure of appMastersData for GET request" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check { + // Checks the type + read[AppMastersData](responseAs[String]) + } + mockMaster.expectMsg(AppMastersDataRequest) + } + + it should "return a json structure of worker data for GET request" in { + implicit val customTimeout = RouteTestTimeout(25.seconds) + Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check { + // Checks the type + val workerListJson = responseAs[String] + val workers = read[List[WorkerSummary]](workerListJson) + assert(workers.size > 0) + workers.foreach { worker => + worker.state shouldBe "active" + } + } + mockMaster.expectMsg(GetAllWorkers) + mockMaster.expectMsgType[ResolveWorkerId] + mockWorker.expectMsgType[GetWorkerData] + } + + it should "return config for master" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + mockMaster.expectMsg(QueryMasterConfig) + } + + "submit invalid application" should "return an error" in { + implicit val routeTestTimeout = RouteTestTimeout(30.second) + val tempfile = new File("foo") + val request = entity(tempfile) + + Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check { + assert(response.status.intValue == 500) + } + } + + private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { + val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), + FileIO.fromFile(file, chunkSize = 100000)) + + val body = Source.single( + Multipart.FormData.BodyPart( + "file", + entity, + Map("filename" -> file.getName))) + val form = Multipart.FormData(body) + + Marshal(form).to[RequestEntity] + } + + "MetricsQueryService" should "return history metrics" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + } + + "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in { + import org.apache.gearpump.util.Graph._ + val processors = Map( + 0 -> ProcessorDescription(0, "A", parallelism = 1), + 1 -> ProcessorDescription(1, "B", parallelism = 1) + ) + val dag = Graph(0 ~ "partitioner" ~> 1) + val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null)) + Post(s"/api/$REST_VERSION/master/submitdag", + HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check { + val responseBody = responseAs[String] + val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody) + assert(submitApplicationResultValue.appId >= 0, "invalid appid") + } + } + + "MasterService" should "return Gearpump built-in partitioner list" in { + (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check { + val responseBody = responseAs[String] + val partitioners = read[BuiltinPartitioners](responseBody) + assert(partitioners.partitioners.length > 0, "invalid response") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala new file mode 100644 index 0000000..c8bfb29 --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.FormData +import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.{AuthorizationFailedRejection, _} +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +class SecurityServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { + + override def testConfig: Config = TestUtil.UI_CONFIG + + implicit def actorSystem: ActorSystem = system + + it should "return 401 if not authenticated" in { + val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) + + implicit val customTimeout = RouteTestTimeout(15.seconds) + + (Get(s"/resource") ~> security.route) ~> check { + assert(rejection.isInstanceOf[AuthenticationFailedRejection]) + } + } + + "guest" should "get protected resource after authentication" in { + val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) + + implicit val customTimeout = RouteTestTimeout(15.seconds) + + var cookie: HttpCookiePair = null + (Post(s"/login", FormData("username" -> "guest", "password" -> "guest")) + ~> security.route) ~> check { + assert("{\"user\":\"guest\"}" == responseAs[String]) + assert(status.intValue() == 200) + assert(header[`Set-Cookie`].isDefined) + val httpCookie = header[`Set-Cookie`].get.cookie + assert(httpCookie.name == "gearpump_token") + cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value) + } + + // After authentication, everything is fine. + Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { + responseAs[String] shouldEqual "OK" + } + + // However, guest cannot access high-permission operations, like POST. + Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { + assert(rejection == AuthorizationFailedRejection) + } + + // Logout, should clear the session + Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check { + assert("{\"user\":\"guest\"}" == responseAs[String]) + assert(status.intValue() == 200) + assert(header[`Set-Cookie`].isDefined) + val httpCookie = header[`Set-Cookie`].get.cookie + assert(httpCookie.name == "gearpump_token") + assert(httpCookie.value == "deleted") + } + + // Access again, rejected this time. + Get("/resource") ~> security.route ~> check { + assert(rejection.isInstanceOf[AuthenticationFailedRejection]) + } + + Post("/resource") ~> security.route ~> check { + assert(rejection.isInstanceOf[AuthenticationFailedRejection]) + } + } + + "admin" should "get protected resource after authentication" in { + val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) + + implicit val customTimeout = RouteTestTimeout(15.seconds) + + var cookie: HttpCookiePair = null + (Post(s"/login", FormData("username" -> "admin", "password" -> "admin")) + ~> security.route) ~> check { + assert("{\"user\":\"admin\"}" == responseAs[String]) + assert(status.intValue() == 200) + assert(header[`Set-Cookie`].isDefined) + val httpCookie = header[`Set-Cookie`].get.cookie + assert(httpCookie.name == "gearpump_token") + cookie = HttpCookiePair(httpCookie.name, httpCookie.value) + } + + // After authentication, everything is fine. + Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { + responseAs[String] shouldEqual "OK" + } + + // Not like guest, admimn can also access POST + Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { + responseAs[String] shouldEqual "OK" + } + + // Logout, should clear the session + Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check { + assert("{\"user\":\"admin\"}" == responseAs[String]) + assert(status.intValue() == 200) + assert(header[`Set-Cookie`].isDefined) + val httpCookie = header[`Set-Cookie`].get.cookie + assert(httpCookie.name == "gearpump_token") + assert(httpCookie.value == "deleted") + } + + // Access again, rejected this time. + Get("/resource") ~> security.route ~> check { + assert(rejection.isInstanceOf[AuthenticationFailedRejection]) + } + + Post("/resource") ~> security.route ~> check { + assert(rejection.isInstanceOf[AuthenticationFailedRejection]) + } + } +} + +object SecurityServiceSpec { + + val resource = new RouteService { + override def route: Route = { + get { + path("resource") { + complete("OK") + } + } ~ + post { + path("resource") { + complete("OK") + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala new file mode 100644 index 0000000..33f0866 --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.duration._ +import scala.util.Try + +import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.util.Constants +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +class StaticServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { + + override def testConfig: Config = TestUtil.UI_CONFIG + private val supervisorPath = system.settings.config.getString( + Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) + + protected def route = new StaticService(system, supervisorPath).route + + it should "return version" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/version") ~> route) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(responseBody == "Unknown-Version") + + // By default, it will be cached. + assert(header[`Cache-Control`].isEmpty) + } + } + + it should "get correct supervisor path" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/supervisor-actor-path") ~> route) ~> check { + val responseBody = responseAs[String] + val defaultSupervisorPath = "" + assert(responseBody == defaultSupervisorPath) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala new file mode 100644 index 0000000..4658c98 --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services + +import scala.concurrent.duration._ +import scala.util.{Success, Try} + +import akka.actor.ActorRef +import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.testkit.TestActor.{AutoPilot, KeepRunning} +import akka.testkit.{TestKit, TestProbe} +import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId} +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.apache.gearpump.jarstore.JarStoreService +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ + +class WorkerServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { + + override def testConfig: Config = TestUtil.DEFAULT_CONFIG + + protected def actorRefFactory = system + + val mockWorker = TestProbe() + + protected def master = mockMaster.ref + + lazy val jarStoreService = JarStoreService.get(system.settings.config) + + protected def workerRoute = new WorkerService(master, system).route + + mockWorker.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case GetWorkerData(workerId) => + sender ! WorkerData(WorkerSummary.empty) + KeepRunning + case QueryWorkerConfig(workerId) => + sender ! WorkerConfig(null) + KeepRunning + case QueryHistoryMetrics(path, _, _, _) => + sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem]) + KeepRunning + } + } + } + + val mockMaster = TestProbe() + mockMaster.setAutoPilot { + new AutoPilot { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { + case ResolveWorkerId(workerId) => + sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) + KeepRunning + } + } + } + + "ConfigQueryService" should "return config for worker" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") + ~> workerRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + } + + it should "return WorkerData" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") + ~> workerRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + + // Check the header, should contains no-cache header. + // Cache-Control:no-cache, max-age=0 + val noCache = header[`Cache-Control`].get.value() + assert(noCache == "no-cache, max-age=0") + } + } + + "MetricsQueryService" should "return history metrics" in { + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") + ~> workerRoute) ~> check { + val responseBody = responseAs[String] + val config = Try(ConfigFactory.parseString(responseBody)) + assert(config.isSuccess) + } + } + + override def afterAll { + TestKit.shutdownActorSystem(system) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala new file mode 100644 index 0000000..fef581e --- /dev/null +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.services.security.oauth2 + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpEntity.Strict +import akka.http.scaladsl.model.MediaTypes._ +import akka.http.scaladsl.model.Uri.{Path, Query} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.testkit.ScalatestRouteTest +import com.typesafe.config.ConfigFactory +import org.scalatest.FlatSpec + +import org.apache.gearpump.security.Authenticator +// NOTE: This cannot be removed!!! +import org.apache.gearpump.services.util.UpickleUtil._ +import org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator + +class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { + + implicit val actorSystem: ActorSystem = system + private val server = new MockOAuth2Server(system, null) + server.start() + private val serverHost = s"http://127.0.0.1:${server.port}" + + val configMap = Map( + "class" -> + "org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator", + "callback" -> s"$serverHost/login/oauth2/cloudfoundryuaa/callback", + "clientid" -> "gearpump_test2", + "clientsecret" -> "gearpump_test2", + "default-userrole" -> "user", + "icon" -> "/icons/uaa.png", + "uaahost" -> serverHost, + "additional-authenticator-enabled" -> "false") + + val configString = ConfigFactory.parseMap(configMap.asJava) + + lazy val uaa = { + val uaa = new CloudFoundryUAAOAuth2Authenticator + uaa.init(configString, system.dispatcher) + uaa + } + + it should "generate the correct authorization request" in { + val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap + assert(parameters("response_type") == "code") + assert(parameters("client_id") == configMap("clientid")) + assert(parameters("redirect_uri") == configMap("callback")) + assert(parameters("scope") == "openid,cloud_controller.read") + } + + it should "authenticate the authorization code and return the correct profile" in { + val code = Map("code" -> "QGGVeA") + val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c" + val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm" + val mail = "[email protected]" + + def accessTokenEndpoint(request: HttpRequest): HttpResponse = { + assert(request.getHeader("Authorization").get.value() == + "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=") + assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") + + val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") + val form = Uri./.withQuery(Query(body)).query().toMap + + assert(form("grant_type") == "authorization_code") + assert(form("code") == "QGGVeA") + assert(form("response_type") == "token") + assert(form("redirect_uri") == configMap("callback")) + + val response = + s""" + |{ + | "access_token": "$accessToken", + | "token_type": "bearer", + | "refresh_token": "$refreshToken", + | "expires_in": 43199, + | "scope": "openid", + | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9" + |} + """.stripMargin + HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) + } + + def protectedResourceEndpoint(request: HttpRequest): HttpResponse = { + assert(request.getUri().query().get("access_token").get == accessToken) + val response = + s""" + |{ + | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c", + | "user_name": "user", + | "email": "$mail" + |} + """.stripMargin + HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) + } + + server.requestHandler = (request: HttpRequest) => { + if (request.uri.path.startsWith(Path("/oauth/token"))) { + accessTokenEndpoint(request) + } else if (request.uri.path.startsWith(Path("/userinfo"))) { + protectedResourceEndpoint(request) + } else { + fail("Unexpected access to " + request.uri.toString()) + } + } + + val userFuture = uaa.authenticate(code) + val user = Await.result(userFuture, 30.seconds) + assert(user.user == mail) + assert(user.permissionLevel == Authenticator.User.permissionLevel) + } + + override def cleanUp(): Unit = { + server.stop() + uaa.close() + super.cleanUp() + } +}
