http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala index 738bbad..e67731e 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,21 @@ package io.gearpump.services +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.TestUtil -import akka.http.scaladsl.testkit.{ScalatestRouteTest, RouteTestTimeout} -import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} -import io.gearpump.services.util.UpickleUtil._ -import scala.concurrent.duration._ -import scala.util.Try +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ -class AdminServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { +class AdminServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { override def testConfig: Config = TestUtil.DEFAULT_CONFIG @@ -36,14 +40,14 @@ class AdminServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers w it should "shutdown the ActorSystem when receiving terminate" in { val route = new AdminService(actorSystem).route - implicit val customTimeout = RouteTestTimeout(15 seconds) - (Post(s"/terminate") ~> route) ~> check{ + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Post(s"/terminate") ~> route) ~> check { assert(status.intValue() == 404) } - actorSystem.awaitTermination(20 seconds) + Await.result(actorSystem.whenTerminated, 20.seconds) // terminate should terminate current actor system - assert(actorSystem.isTerminated) + assert(actorSystem.whenTerminated.isCompleted) } }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala index 7b98cb6..59da80d 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,13 +18,19 @@ package io.gearpump.services +import scala.concurrent.duration._ +import scala.util.{Success, Try} + import akka.actor.ActorRef import akka.http.scaladsl.model.headers.`Cache-Control` -import akka.http.scaladsl.server.RouteResult +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.{TestKit, TestProbe} import com.typesafe.config.{Config, ConfigFactory} -import akka.http.scaladsl.testkit.RouteTestTimeout +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import org.slf4j.Logger +import upickle.default.read + import io.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary import io.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId} import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} @@ -33,13 +39,8 @@ import io.gearpump.cluster.TestUtil import io.gearpump.jarstore.JarStoreService import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import io.gearpump.util.LogUtil -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import org.slf4j.Logger -import upickle.default.read -import akka.http.scaladsl.testkit.ScalatestRouteTest +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ -import scala.concurrent.duration._ -import scala.util.{Success, Try} class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { @@ -47,7 +48,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest override def testConfig: Config = TestUtil.UI_CONFIG private val LOG: Logger = LogUtil.getLogger(getClass) - def actorRefFactory = system + private def actorRefFactory = system val mockAppMaster = TestProbe() val failure = LastFailure(System.currentTimeMillis(), "Some error") @@ -56,13 +57,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest def jarStore: JarStoreService = jarStoreService - def master = mockMaster.ref + private def master = mockMaster.ref - def appMasterRoute = new AppMasterService(master, jarStore, system).route + private def appMasterRoute = new AppMasterService(master, jarStore, system).route mockAppMaster.setAutoPilot { new AutoPilot { - def run(sender: ActorRef, msg: Any) = msg match { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { case AppMasterDataDetailRequest(appId) => sender ! GeneralAppMasterSummary(appId) KeepRunning @@ -78,7 +79,6 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest case QueryExecutorConfig(0) => sender ! ExecutorConfig(system.settings.config) KeepRunning - } } } @@ -86,7 +86,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest val mockMaster = TestProbe() mockMaster.setAutoPilot { new AutoPilot { - def run(sender: ActorRef, msg: Any) = msg match { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { case ResolveAppId(0) => sender ! ResolveAppIdResult(Success(mockAppMaster.ref)) KeepRunning @@ -102,20 +102,19 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest "AppMasterService" should "return a JSON structure for GET request when detail = false" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check{ + Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check { val responseBody = responseAs[String] read[AppMasterData](responseBody) - // check the header, should contains no-cache header. + // Checks the header, should contains no-cache header. // Cache-Control:no-cache, max-age=0 val noCache = header[`Cache-Control`].get.value() assert(noCache == "no-cache, max-age=0") } - Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check{ + Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check { val responseBody = responseAs[String] } - } "MetricsQueryService" should "return history metrics" in { @@ -137,7 +136,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest "ConfigQueryService" should "return config for application" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check{ + (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -146,7 +145,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest it should "return config for executor " in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check{ + (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -155,14 +154,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest it should "return return executor summary" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check{ + (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check { val responseBody = responseAs[String] val executorSummary = read[ExecutorSummary](responseBody) assert(executorSummary.id == 0) } } - override def afterAll { TestKit.shutdownActorSystem(system) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala index 1cfc01a..a24db18 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,55 +19,55 @@ package io.gearpump.services import java.io.File +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Success, Try} import akka.actor.ActorRef import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{`Cache-Control`, `Set-Cookie`} -import akka.stream.scaladsl.Source +import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import akka.stream.scaladsl.{FileIO, Source} import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory} -import io.gearpump.WorkerId +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication} import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList} import io.gearpump.cluster.MasterToClient._ import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.worker.WorkerSummary -import io.gearpump.services.MasterService.{SubmitApplicationRequest, BuiltinPartitioners} +import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} import io.gearpump.jarstore.JarStoreService -import io.gearpump.streaming.ProcessorDescription -import io.gearpump.util.{Constants, Graph} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import scala.concurrent.{Future, ExecutionContext} -import scala.concurrent.duration._ -import scala.util.{Success, Try} -import akka.stream.scaladsl.FileIO +import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ +import io.gearpump.streaming.ProcessorDescription +import io.gearpump.util.Graph -class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with - Matchers with BeforeAndAfterAll { +class MasterServiceSpec extends FlatSpec with ScalatestRouteTest + with Matchers with BeforeAndAfterAll { import upickle.default.{read, write} override def testConfig: Config = TestUtil.UI_CONFIG - def actorRefFactory = system + private def actorRefFactory = system val workerId = 0 val mockWorker = TestProbe() lazy val jarStoreService = JarStoreService.get(system.settings.config) - def master = mockMaster.ref + private def master = mockMaster.ref def jarStore: JarStoreService = jarStoreService - def masterRoute = new MasterService(master, jarStore, system).route + private def masterRoute = new MasterService(master, jarStore, system).route mockWorker.setAutoPilot { new AutoPilot { - def run(sender: ActorRef, msg: Any) = msg match { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { case GetWorkerData(workerId) => sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId)) KeepRunning @@ -81,11 +81,11 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with val mockMaster = TestProbe() mockMaster.setAutoPilot { new AutoPilot { - def run(sender: ActorRef, msg: Any) = msg match { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { case GetMasterData => sender ! MasterData(null) KeepRunning - case AppMastersDataRequest => + case AppMastersDataRequest => sender ! AppMastersData(List.empty[AppMasterData]) KeepRunning case GetAllWorkers => @@ -107,15 +107,14 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with } } - it should "return master info when asked" in { implicit val customTimeout = RouteTestTimeout(15.seconds) (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check { - // check the type + // Checks the type val content = responseAs[String] read[MasterData](content) - // check the header, should contains no-cache header. + // Checks the header, should contains no-cache header. // Cache-Control:no-cache, max-age=0 val noCache = header[`Cache-Control`].get.value() assert(noCache == "no-cache, max-age=0") @@ -127,7 +126,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with it should "return a json structure of appMastersData for GET request" in { implicit val customTimeout = RouteTestTimeout(15.seconds) (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check { - //check the type + // Checks the type read[AppMastersData](responseAs[String]) } mockMaster.expectMsg(AppMastersDataRequest) @@ -136,7 +135,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with it should "return a json structure of worker data for GET request" in { implicit val customTimeout = RouteTestTimeout(25.seconds) Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check { - //check the type + // Checks the type val workerListJson = responseAs[String] val workers = read[List[WorkerSummary]](workerListJson) assert(workers.size > 0) @@ -151,7 +150,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with it should "return config for master" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check{ + (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -170,7 +169,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with } private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { - val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), + val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromFile(file, chunkSize = 100000)) val body = Source.single( @@ -185,7 +184,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with "MetricsQueryService" should "return history metrics" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute)~> check { + (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -200,7 +199,8 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with ) val dag = Graph(0 ~ "partitioner" ~> 1) val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null)) - Post(s"/api/$REST_VERSION/master/submitdag", HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check { + Post(s"/api/$REST_VERSION/master/submitdag", + HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check { val responseBody = responseAs[String] val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody) assert(submitApplicationResultValue.appId >= 0, "invalid appid") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala index 10b45b9..3cf99e9 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,23 +18,23 @@ package io.gearpump.services -import akka.http.scaladsl.testkit.{RouteTestTimeout} -import com.typesafe.config.Config -import io.gearpump.cluster.TestUtil -import io.gearpump.services.util.UpickleUtil._ -import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} -import akka.actor.{ActorSystem} -import akka.http.scaladsl.server._ import scala.concurrent.duration._ -import akka.http.scaladsl.model._ -import headers._ + +import akka.actor.ActorSystem import akka.http.scaladsl.model.FormData -import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`} -import akka.http.scaladsl.server.AuthorizationFailedRejection +import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _} import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.testkit.ScalatestRouteTest +import akka.http.scaladsl.server.{AuthorizationFailedRejection, _} +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import com.typesafe.config.Config +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { +import io.gearpump.cluster.TestUtil +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ + +class SecurityServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { override def testConfig: Config = TestUtil.UI_CONFIG @@ -43,9 +43,9 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher it should "return 401 if not authenticated" in { val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) - implicit val customTimeout = RouteTestTimeout(15 seconds) + implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/resource") ~> security.route) ~> check{ + (Get(s"/resource") ~> security.route) ~> check { assert(rejection.isInstanceOf[AuthenticationFailedRejection]) } } @@ -53,10 +53,11 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher "guest" should "get protected resource after authentication" in { val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) - implicit val customTimeout = RouteTestTimeout(15 seconds) + implicit val customTimeout = RouteTestTimeout(15.seconds) var cookie: HttpCookiePair = null - (Post(s"/login", FormData("username" -> "guest", "password" -> "guest")) ~> security.route) ~> check{ + (Post(s"/login", FormData("username" -> "guest", "password" -> "guest")) + ~> security.route) ~> check { assert("{\"user\":\"guest\"}" == responseAs[String]) assert(status.intValue() == 200) assert(header[`Set-Cookie`].isDefined) @@ -65,18 +66,18 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value) } - // after authentication, everything is fine. + // After authentication, everything is fine. Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { responseAs[String] shouldEqual "OK" } - // however, guest cannot access high-permission operations, like POST. - Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { + // However, guest cannot access high-permission operations, like POST. + Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { assert(rejection == AuthorizationFailedRejection) } - // logout, should clear the session - Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{ + // Logout, should clear the session + Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check { assert("{\"user\":\"guest\"}" == responseAs[String]) assert(status.intValue() == 200) assert(header[`Set-Cookie`].isDefined) @@ -85,12 +86,12 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher assert(httpCookie.value == "deleted") } - // access again, rejected. - Get("/resource") ~> security.route ~> check { + // Access again, rejected this time. + Get("/resource") ~> security.route ~> check { assert(rejection.isInstanceOf[AuthenticationFailedRejection]) } - Post("/resource") ~> security.route ~> check { + Post("/resource") ~> security.route ~> check { assert(rejection.isInstanceOf[AuthenticationFailedRejection]) } } @@ -98,10 +99,11 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher "admin" should "get protected resource after authentication" in { val security = new SecurityService(SecurityServiceSpec.resource, actorSystem) - implicit val customTimeout = RouteTestTimeout(15 seconds) + implicit val customTimeout = RouteTestTimeout(15.seconds) var cookie: HttpCookiePair = null - (Post(s"/login", FormData("username" -> "admin", "password" -> "admin")) ~> security.route) ~> check{ + (Post(s"/login", FormData("username" -> "admin", "password" -> "admin")) + ~> security.route) ~> check { assert("{\"user\":\"admin\"}" == responseAs[String]) assert(status.intValue() == 200) assert(header[`Set-Cookie`].isDefined) @@ -110,7 +112,7 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher cookie = HttpCookiePair(httpCookie.name, httpCookie.value) } - // after authentication, everything is fine. + // After authentication, everything is fine. Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check { responseAs[String] shouldEqual "OK" } @@ -120,8 +122,8 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher responseAs[String] shouldEqual "OK" } - // logout, should clear the session - Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{ + // Logout, should clear the session + Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check { assert("{\"user\":\"admin\"}" == responseAs[String]) assert(status.intValue() == 200) assert(header[`Set-Cookie`].isDefined) @@ -130,8 +132,8 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher assert(httpCookie.value == "deleted") } - // access again, rejected. - Get("/resource") ~> security.route ~> check { + // Access again, rejected this time. + Get("/resource") ~> security.route ~> check { assert(rejection.isInstanceOf[AuthenticationFailedRejection]) } @@ -145,7 +147,7 @@ object SecurityServiceSpec { val resource = new RouteService { override def route: Route = { - get{ + get { path("resource") { complete("OK") } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala index 501776e..f61e2f5 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,41 +18,43 @@ package io.gearpump.services +import scala.concurrent.duration._ +import scala.util.Try + import akka.http.scaladsl.model.headers.`Cache-Control` import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.TestUtil import io.gearpump.util.Constants +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ -import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} -import scala.util.Try -import scala.concurrent.duration._ - -class StaticServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { +class StaticServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { override def testConfig: Config = TestUtil.UI_CONFIG - private val supervisorPath = system.settings.config.getString(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) - + private val supervisorPath = system.settings.config.getString( + Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH) - def route = new StaticService(system, supervisorPath).route + protected def route = new StaticService(system, supervisorPath).route it should "return version" in { - implicit val customTimeout = RouteTestTimeout(15 seconds) - (Get(s"/version") ~> route) ~> check{ + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/version") ~> route) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(responseBody == "Unknown-Version") - // by default, it will be cached. + // By default, it will be cached. assert(header[`Cache-Control`].isEmpty) } } it should "get correct supervisor path" in { - implicit val customTimeout = RouteTestTimeout(15 seconds) - (Get(s"/supervisor-actor-path") ~> route) ~> check{ + implicit val customTimeout = RouteTestTimeout(15.seconds) + (Get(s"/supervisor-actor-path") ~> route) ~> check { val responseBody = responseAs[String] val defaultSupervisorPath = "" assert(responseBody == defaultSupervisorPath) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala index dc8d5a7..2676a16 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,42 +18,44 @@ package io.gearpump.services +import scala.concurrent.duration._ +import scala.util.{Success, Try} + import akka.actor.ActorRef import akka.http.scaladsl.model.headers.`Cache-Control` +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.{TestKit, TestProbe} import com.typesafe.config.{Config, ConfigFactory} -import io.gearpump.WorkerId +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId} import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig} import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.worker.WorkerSummary +import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} import io.gearpump.jarstore.JarStoreService +// NOTE: This cannot be removed!!! import io.gearpump.services.util.UpickleUtil._ -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import scala.concurrent.duration._ -import scala.util.{Success, Try} -import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} -class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { +class WorkerServiceSpec + extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { override def testConfig: Config = TestUtil.DEFAULT_CONFIG - def actorRefFactory = system + protected def actorRefFactory = system val mockWorker = TestProbe() - def master = mockMaster.ref + protected def master = mockMaster.ref lazy val jarStoreService = JarStoreService.get(system.settings.config) - def workerRoute = new WorkerService(master, system).route + protected def workerRoute = new WorkerService(master, system).route mockWorker.setAutoPilot { new AutoPilot { - def run(sender: ActorRef, msg: Any) = msg match { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { case GetWorkerData(workerId) => sender ! WorkerData(WorkerSummary.empty) KeepRunning @@ -70,7 +72,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers val mockMaster = TestProbe() mockMaster.setAutoPilot { new AutoPilot { - def run(sender: ActorRef, msg: Any) = msg match { + def run(sender: ActorRef, msg: Any): AutoPilot = msg match { case ResolveWorkerId(workerId) => sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) KeepRunning @@ -78,10 +80,10 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers } } - "ConfigQueryService" should "return config for worker" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") ~> workerRoute) ~> check{ + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") + ~> workerRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -90,12 +92,13 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers it should "return WorkerData" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") ~> workerRoute) ~> check{ + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") + ~> workerRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) - // check the header, should contains no-cache header. + // Check the header, should contains no-cache header. // Cache-Control:no-cache, max-age=0 val noCache = header[`Cache-Control`].get.value() assert(noCache == "no-cache, max-age=0") @@ -104,7 +107,8 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers "MetricsQueryService" should "return history metrics" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") ~> workerRoute) ~> check { + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") + ~> workerRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala index 68a3506..136026a 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +18,23 @@ package io.gearpump.services.security.oauth2 +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpEntity.Strict import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.Uri.{Query, Path} +import akka.http.scaladsl.model.Uri.{Path, Query} import akka.http.scaladsl.model._ import akka.http.scaladsl.testkit.ScalatestRouteTest import com.typesafe.config.ConfigFactory -import io.gearpump.security.Authenticator -import io.gearpump.services.security.oauth2.impl.{CloudFoundryUAAOAuth2Authenticator, GoogleOAuth2Authenticator} import org.scalatest.FlatSpec -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.collection.JavaConverters._ + +import io.gearpump.security.Authenticator +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ +import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { @@ -71,8 +75,9 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm" val mail = "[email protected]" - def accessTokenEndpoint(request: HttpRequest) = { - assert(request.getHeader("Authorization").get.value() == "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=") + def accessTokenEndpoint(request: HttpRequest): HttpResponse = { + assert(request.getHeader("Authorization").get.value() == + "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=") assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8") @@ -85,27 +90,27 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout val response = s""" - |{ - | "access_token": "$accessToken", - | "token_type": "bearer", - | "refresh_token": "$refreshToken", - | "expires_in": 43199, - | "scope": "openid", - | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9" - |} + |{ + | "access_token": "$accessToken", + | "token_type": "bearer", + | "refresh_token": "$refreshToken", + | "expires_in": 43199, + | "scope": "openid", + | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9" + |} """.stripMargin HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) } - def protectedResourceEndpoint(request: HttpRequest) = { + def protectedResourceEndpoint(request: HttpRequest): HttpResponse = { assert(request.getUri().query().get("access_token").get == accessToken) val response = s""" - |{ - | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c", - | "user_name": "user", - | "email": "$mail" - |} + |{ + | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c", + | "user_name": "user", + | "email": "$mail" + |} """.stripMargin HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) } @@ -121,7 +126,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout } val userFuture = uaa.authenticate(code) - val user = Await.result(userFuture, 30 seconds) + val user = Await.result(userFuture, 30.seconds) assert(user.user == mail) assert(user.permissionLevel == Authenticator.User.permissionLevel) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala index 58b4a34..70d8bb0 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,21 +18,24 @@ package io.gearpump.services.security.oauth2 +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpEntity.Strict import akka.http.scaladsl.model.MediaTypes._ -import akka.http.scaladsl.model.Uri.{Query, Path} +import akka.http.scaladsl.model.Uri.{Path, Query} import akka.http.scaladsl.model._ import akka.http.scaladsl.testkit.ScalatestRouteTest import com.typesafe.config.ConfigFactory -import io.gearpump.security.Authenticator -import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator -import io.gearpump.services.security.oauth2.impl.{GoogleOAuth2Authenticator, CloudFoundryUAAOAuth2Authenticator} import org.scalatest.FlatSpec -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration._ +import io.gearpump.security.Authenticator +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ +import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator +import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { @@ -71,7 +74,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm" val mail = "[email protected]" - def accessTokenEndpoint(request: HttpRequest) = { + def accessTokenEndpoint(request: HttpRequest): HttpResponse = { assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded") @@ -85,33 +88,37 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { assert(form("redirect_uri") == configMap("callback")) assert(form("scope") == GoogleOAuth2Authenticator.Scope) - val response = s""" - |{ - | "access_token": "$accessToken", - | "token_type": "Bearer", - | "expires_in": 3591, - | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3" - |} + // scalastyle:off line.size.limit + val response = + s""" + |{ + | "access_token": "$accessToken", + | "token_type": "Bearer", + | "expires_in": 3591, + | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3" + |} """.stripMargin + // scalastyle:on line.size.limit HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) } - def protectedResourceEndpoint(request: HttpRequest) = { + def protectedResourceEndpoint(request: HttpRequest): HttpResponse = { assert(request.getUri().query().get("access_token").get == accessToken) - val response =s""" - |{ - | "kind": "plus#person", - | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8", - | "nickname": "gearpump", - | "gender": "female", - | "emails": [ - | { - | "value": "$mail", - | "type": "account" - | } - | ] - | } + val response = + s""" + |{ + | "kind": "plus#person", + | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8", + | "nickname": "gearpump", + | "gender": "female", + | "emails": [ + | { + | "value": "$mail", + | "type": "account" + | } + | ] + | } """.stripMargin HttpResponse(entity = HttpEntity(ContentType(`application/json`), response)) } @@ -127,7 +134,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest { } val userFuture = google.authenticate(code) - val user = Await.result(userFuture, 30 seconds) + val user = Await.result(userFuture, 30.seconds) assert(user.user == mail) assert(user.permissionLevel == Authenticator.Guest.permissionLevel) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala index 9b6b5e9..c03532d 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,18 +18,18 @@ package io.gearpump.services.security.oauth2 -import akka.actor.{ActorRef, ActorSystem} +import scala.concurrent.{Await, Future} + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.Http.ServerBinding -import akka.http.scaladsl.model.{HttpResponse, HttpRequest} -import akka.http.scaladsl.server.Route +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.ActorMaterializer -import akka.http.scaladsl.Http -import akka.http.scaladsl.server.Directives._ -import akka.stream.scaladsl.{Sink, Source} -import io.gearpump.util.Util -import akka.pattern.ask +import akka.stream.scaladsl.Sink -import scala.concurrent.{Await, Future} +import io.gearpump.util.Util +// NOTE: This cannot be removed!! +import io.gearpump.services.util.UpickleUtil._ /** * Serves as a fake OAuth2 server. @@ -48,12 +48,11 @@ class MockOAuth2Server( def port: Int = _port def start(): Unit = { - _port = Util.findFreePort.get + _port = Util.findFreePort().get val serverSource = Http().bind(interface = "127.0.0.1", port = _port) bindingFuture = { serverSource.to(Sink.foreach { connection => - println("Accepted new connection from " + connection.remoteAddress) connection handleWithSyncHandler requestHandler }).run() } @@ -61,6 +60,6 @@ class MockOAuth2Server( def stop(): Unit = { import scala.concurrent.duration._ - Await.result(bindingFuture.map(_.unbind()), 120 seconds) + Await.result(bindingFuture.map(_.unbind()), 120.seconds) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala index 9b3069e..b074813 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.services.util +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +import upickle.default.{read, write} + import io.gearpump.cluster.UserConfig import io.gearpump.metrics.Metrics.{Counter, MetricType} import io.gearpump.services.util.UpickleUtil._ import io.gearpump.streaming.ProcessorId import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} import io.gearpump.util.Graph -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import upickle.default.{read, write} class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/README.md ---------------------------------------------------------------------- diff --git a/streaming/README.md b/streaming/README.md deleted file mode 100644 index 86d4ec2..0000000 --- a/streaming/README.md +++ /dev/null @@ -1,116 +0,0 @@ -A very basic DSL which support flatmap, reduce, and etc.. -====================== - - -Supported operators: ------------------- -```scala -class Stream[T](dag: Graph[Op, OpEdge], private val thisNode: Op, private val edge: Option[OpEdge] = None) { - - /** - * convert a value[T] to a list of value[R] - * @param fun - * @tparam R - * @return - */ - def flatMap[R](fun: T => TraversableOnce[R]): Stream[R] - - /** - * convert value[T] to value[R] - * @param fun - * @tparam R - * @return - */ - def map[R](fun: T => R): Stream[R] - - /** - * reserve records when fun(T) == true - * @param fun - * @return - */ - def filter(fun: T => Boolean): Stream[T] - - /** - * Reduce opeartion - * @param fun - * @return - */ - def reduce(fun: (T, T) => T): Stream[T] - - /** - * Log to task log file - */ - def log(): Unit - - /** - * Merge data from two stream into one - * @param other - * @return - */ - def merge(other: Stream[T]): Stream[T] - - /** - * Group by fun(T) - * - * For example, we have T type, People(name: String, gender: String, age: Int) - * groupBy[People](_.gender) will group the people by gender. - * - * You can append other combinators after groupBy - * - * For example, - * - * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) - * - * @param fun - * @param parallism - * @tparam Group - * @return - */ - def groupBy[Group](fun: T => Group, parallism: Int = 1): Stream[T] - - /** - * connect with a low level Processor(TaskDescription) - * @param processor - * @param parallism - * @tparam R - * @return - */ - def process[R](processor: Class[_ <: Task], parallism: Int): Stream[R] -} - -``` - -How to define the DSL ---------------- -WordCount: - -```scala -val context = ClientContext(master) - val app = StreamApp("dsl", context) - - val data = "This is a good start, bingo!! bingo!!" - app.fromCollection(data.lines.toList). - // word => (word, count) - flatMap(line => line.split("[\\s]+")).map((_, 1)). - // (word, count1), (word, count2) => (word, count1 + count2) - groupBy(kv => kv._1).reduce((left, right) => (left._1, left._2 + right._2)) - - val appId = context.submit(app) - context.close() -``` - -For the full example, please check https://github.com/intel-hadoop/gearpump/tree/master/experiments/dsl/src/main/scala/io.gearpump/streaming/dsl/example - - -Run an example ---------------------- -```bash -# start master -bin\local - -# start UI -bin\services - -# start example topology -bin\gear io.gearpump.streaming.dsl.example.WordCount -``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java index 8b81253..dbae1c4 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,8 +19,15 @@ package io.gearpump.streaming.javaapi; import io.gearpump.partitioner.Partitioner; +import io.gearpump.streaming.Processor; +import io.gearpump.streaming.task.Task; -public class Graph extends io.gearpump.util.Graph<io.gearpump.streaming.Processor<? extends io.gearpump.streaming.task.Task>, Partitioner> { +/** + * Java version of Graph + * + * See {@link io.gearpump.util.Graph} + */ +public class Graph extends io.gearpump.util.Graph<Processor<? extends Task>, Partitioner> { public Graph() { super(null, null); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java index 52b5d61..974183e 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,6 +27,11 @@ import io.gearpump.streaming.source.DataSource; import io.gearpump.streaming.source.DataSourceProcessor; import io.gearpump.streaming.source.DataSourceTask; +/** + * Java version of Processor + * + * See {@link io.gearpump.streaming.Processor} + */ public class Processor<T extends io.gearpump.streaming.task.Task> implements io.gearpump.streaming.Processor<T> { private Class<T> _taskClass; private int _parallelism = 1; @@ -43,46 +48,49 @@ public class Processor<T extends io.gearpump.streaming.task.Task> implements io. } /** - * Create a Sink Processor - * @param dataSink the data sink itself - * @param parallelism the parallelism of this processor - * @param description the description for this processor - * @param taskConf the configuration for this processor - * @param system actor system - * @return the new created sink processor + * Creates a Sink Processor + * + * @param dataSink the data sink itself + * @param parallelism the parallelism of this processor + * @param description the description for this processor + * @param taskConf the configuration for this processor + * @param system actor system + * @return the new created sink processor */ - public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) { + public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) { io.gearpump.streaming.Processor<DataSinkTask> p = DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system); return new Processor(p); } /** - * Create a Source Processor - * @param source the data source itself - * @param parallelism the parallelism of this processor - * @param description the description of this processor - * @param taskConf the configuration of this processor - * @param system actor system - * @return the new created source processor + * Creates a Source Processor + * + * @param source the data source itself + * @param parallelism the parallelism of this processor + * @param description the description of this processor + * @param taskConf the configuration of this processor + * @param system actor system + * @return the new created source processor */ - public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) { + public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) { io.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system); return new Processor(p); } public Processor(io.gearpump.streaming.Processor<T> processor) { - this._taskClass = (Class)(processor.taskClass()); + this._taskClass = (Class) (processor.taskClass()); this._parallelism = processor.parallelism(); this._description = processor.description(); this._userConf = processor.taskConf(); } /** - * Create a general processor with user specified task logic. - * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task} + * Creates a general processor with user specified task logic. + * + * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task} * @param parallelism, how many initial tasks you want to use * @param description, some text to describe this processor - * @param taskConf, Processor specific configuration + * @param taskConf, Processor specific configuration */ public Processor(Class<T> taskClass, int parallelism, String description, UserConfig taskConf) { this._taskClass = taskClass; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java index d017793..150a26f 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,6 @@ * limitations under the License. */ - package io.gearpump.streaming.javaapi; import akka.actor.ActorSystem; @@ -24,21 +23,25 @@ import io.gearpump.cluster.Application; import io.gearpump.cluster.ApplicationMaster; import io.gearpump.cluster.UserConfig; - +/** + * Java version of StreamApplication. + * + * Also see {@link io.gearpump.streaming.StreamApplication} + */ public class StreamApplication implements Application { private io.gearpump.streaming.StreamApplication app; /** - * Create a streaming application - * @param name name of the application - * @param conf user configuration - * @param graph the DAG + * Creates a streaming application * + * @param name Name of the application + * @param conf User configuration + * @param graph The DAG */ public StreamApplication(String name, UserConfig conf, Graph graph) { //by pass the tricky type check in scala 2.10 io.gearpump.util.Graph untypedGraph = graph; this.app = io.gearpump.streaming.StreamApplication.apply( - name, untypedGraph, conf); + name, untypedGraph, conf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java index 836287d..45fae19 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,6 +24,11 @@ import io.gearpump.cluster.UserConfig; import io.gearpump.streaming.task.StartTime; import io.gearpump.streaming.task.TaskContext; +/** + * Java version of Task. + * + * See {@link io.gearpump.streaming.task.Task} + */ public class Task extends io.gearpump.streaming.task.Task { protected TaskContext context; protected UserConfig userConf; @@ -40,11 +45,14 @@ public class Task extends io.gearpump.streaming.task.Task { } @Override - public void onStart(StartTime startTime){} + public void onStart(StartTime startTime) { + } @Override - public void onNext(Message msg){} + public void onNext(Message msg) { + } @Override - public void onStop(){} + public void onStop() { + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java index e4e137f..bb97442 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,9 @@ package io.gearpump.streaming.javaapi.dsl.functions; import java.io.Serializable; /** - * a function that decides whether to reserve a value<T> + * Filter function + * + * @param <T> Message of type T */ public interface FilterFunction<T> extends Serializable { boolean apply(T t); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java index b65a338..3e18cf1 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,7 +22,10 @@ import java.io.Serializable; import java.util.Iterator; /** - * a function that converts a value<T> to a iterator of value<R> + * Function that converts a value of type T to a iterator of values of type R. + * + * @param <T> Input value type + * @param <R> Return value type */ public interface FlatMapFunction<T, R> extends Serializable { Iterator<R> apply(T t); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java index 651c477..2ba524e 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,10 @@ package io.gearpump.streaming.javaapi.dsl.functions; import java.io.Serializable; /** - * a function that puts a value<T> into a Group + * GroupBy function which assign value of type T to groups + * + * @param <T> Input value type + * @param <Group> Group Type */ public interface GroupByFunction<T, Group> extends Serializable { Group apply(T t); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java index a30a671..b4bd6ac 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,10 @@ package io.gearpump.streaming.javaapi.dsl.functions; import java.io.Serializable; /** - * a function that converts a value<T> to value<R> + * Function that map a value of type T to value of type R + * + * @param <T> Input value type + * @param <R> Output value type */ public interface MapFunction<T, R> extends Serializable { R apply(T t); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java index 0f4bb18..f439c0a 100644 --- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,9 @@ package io.gearpump.streaming.javaapi.dsl.functions; import java.io.Serializable; /** - * a function that applies reduce operation + * Function that applies reduce operation + * + * @param <T> Input value type */ public interface ReduceFunction<T> extends Serializable { T apply(T t1, T t2); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala index 370b36c..06cf022 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,29 +18,34 @@ package io.gearpump.streaming +import scala.language.existentials + import akka.actor.ActorRef -import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations -import io.gearpump.streaming.task.{TaskId, Subscriber} + import io.gearpump.TimeStamp import io.gearpump.cluster.appmaster.WorkerInfo import io.gearpump.cluster.scheduler.Resource +import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations +import io.gearpump.streaming.task.{Subscriber, TaskId} import io.gearpump.transport.HostPort -import scala.language.existentials - object AppMasterToExecutor { - case class LaunchTasks(taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription, subscribers: List[Subscriber]) + case class LaunchTasks( + taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription, + subscribers: List[Subscriber]) case object TasksLaunched /** * dagVersion, life, and subscribers will be changed on target task list. */ - case class ChangeTasks(taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) + case class ChangeTasks( + taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) case class TasksChanged(taskIds: List[TaskId]) - case class ChangeTask(taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) + case class ChangeTask( + taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber]) case class TaskChanged(taskId: TaskId, dagVersion: Int) @@ -52,7 +57,8 @@ object AppMasterToExecutor { case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId) - case class TaskLocationsRejected(dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable) + case class TaskLocationsRejected( + dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable) case class StartAllTasks(dagVersion: Int) @@ -65,10 +71,11 @@ object AppMasterToExecutor { } object ExecutorToAppMaster { - case class RegisterExecutor(executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo) + case class RegisterExecutor( + executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo) - case class RegisterTask(taskId: TaskId, executorId : Int, task: HostPort) - case class UnRegisterTask(taskId: TaskId, executorId : Int) + case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort) + case class UnRegisterTask(taskId: TaskId, executorId: Int) case class MessageLoss(executorId: Int, taskId: TaskId, cause: String) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala index fa23807..becf31a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.streaming object Constants { @@ -11,6 +29,9 @@ object Constants { val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms" - val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT = "gearpump.streaming.max-pending-message-count-per-connection" - val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = "gearpump.streaming.ack-once-every-message-count" + val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT = + "gearpump.streaming.max-pending-message-count-per-connection" + + val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = + "gearpump.streaming.ack-once-every-message-count" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/DAG.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala index a6b8cba..a73fd48 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,16 @@ package io.gearpump.streaming -import io.gearpump.streaming.task.TaskId import io.gearpump.partitioner.PartitionerDescription +import io.gearpump.streaming.task.TaskId import io.gearpump.util.Graph /** - * DAG is wrapper for [[Graph]] for streaming applications. + * DAG is wrapper for [[io.gearpump.util.Graph]] for streaming applications. */ -case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription], graph : Graph[ProcessorId, PartitionerDescription]) extends Serializable { +case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription], + graph : Graph[ProcessorId, PartitionerDescription]) + extends Serializable { def isEmpty: Boolean = { processors.isEmpty @@ -46,15 +48,15 @@ case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription] } object DAG { - def apply (graph : Graph[ProcessorDescription, PartitionerDescription], version: Int = 0) : DAG = { - val processors = graph.vertices.map{processorDescription => + def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = { + val processors = graph.vertices.map { processorDescription => (processorDescription.id, processorDescription) }.toMap - val dag = graph.mapVertex{ processor => + val dag = graph.mapVertex { processor => processor.id } new DAG(version, processors, dag) } - def empty() = apply(Graph.empty[ProcessorDescription, PartitionerDescription]) + def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription]) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala index 4eabcd2..8eb866d 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -75,7 +75,7 @@ class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckReques } } -class AckRequestSerializer extends TaskMessageSerializer[AckRequest]{ +class AckRequestSerializer extends TaskMessageSerializer[AckRequest] { val taskIdSerializer = new TaskIdSerializer override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala index b68054a..0ca4d92 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,20 +18,20 @@ package io.gearpump.streaming +import scala.language.implicitConversions +import scala.reflect.ClassTag + import akka.actor.ActorSystem -import io.gearpump.streaming.appmaster.AppMaster -import io.gearpump.streaming.task.Task + import io.gearpump.TimeStamp import io.gearpump.cluster._ import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject} -import io.gearpump.util.{LogUtil, Graph, ReferenceEqual} - -import scala.language.implicitConversions -import scala.reflect.ClassTag +import io.gearpump.streaming.appmaster.AppMaster +import io.gearpump.streaming.task.Task +import io.gearpump.util.{Graph, LogUtil, ReferenceEqual} /** * Processor is the blueprint for tasks. - * */ trait Processor[+T <: Task] extends ReferenceEqual { @@ -41,7 +41,7 @@ trait Processor[+T <: Task] extends ReferenceEqual { def parallelism: Int /** - * The custom [[UserConfig]], it is used to initialize a task in runtime. + * The custom [[io.gearpump.cluster.UserConfig]], it is used to initialize a task in runtime. */ def taskConf: UserConfig @@ -59,20 +59,29 @@ trait Processor[+T <: Task] extends ReferenceEqual { } object Processor { - def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task]): ProcessorDescription = { + def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task]) + : ProcessorDescription = { import processor._ ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf) } - def apply[T<: Task](parallelism : Int, description: String = "", taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T]): DefaultProcessor[T] = { - new DefaultProcessor[T](parallelism, description, taskConf, classtag.runtimeClass.asInstanceOf[Class[T]]) + def apply[T<: Task]( + parallelism : Int, description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T]) + : DefaultProcessor[T] = { + new DefaultProcessor[T](parallelism, description, taskConf, + classtag.runtimeClass.asInstanceOf[Class[T]]) } - def apply[T<: Task](taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig): DefaultProcessor[T] = { + def apply[T<: Task]( + taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig) + : DefaultProcessor[T] = { new DefaultProcessor[T](parallelism, description, taskConf, taskClazz) } - case class DefaultProcessor[T<: Task](parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T]) extends Processor[T] { + case class DefaultProcessor[T<: Task]( + parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T]) + extends Processor[T] { def withParallelism(parallel: Int): DefaultProcessor[T] = { new DefaultProcessor[T](parallel, description, taskConf, taskClass) @@ -93,7 +102,6 @@ object Processor { * * When input message's timestamp is beyond current processor's lifetime, * then it will not be processed by this processor. - * */ case class LifeTime(birth: TimeStamp, death: TimeStamp) { def contains(timestamp: TimeStamp): Boolean = { @@ -112,7 +120,9 @@ object LifeTime { /** * Represent a streaming application */ -class StreamApplication(override val name : String, val inputUserConfig: UserConfig, val dag: Graph[ProcessorDescription, PartitionerDescription]) +class StreamApplication( + override val name: String, val inputUserConfig: UserConfig, + val dag: Graph[ProcessorDescription, PartitionerDescription]) extends Application { require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") @@ -137,19 +147,21 @@ object StreamApplication { private val hashPartitioner = new HashPartitioner() private val LOG = LogUtil.getLogger(getClass) - def apply[T <: Processor[Task], P <: Partitioner] (name : String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = { - import Processor._ + def apply[T <: Processor[Task], P <: Partitioner]( + name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = { + import io.gearpump.streaming.Processor._ if (dag.hasCycle()) { LOG.warn(s"Detected cycles in DAG of application $name!") } val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap - val graph = dag.mapVertex {processor => + val graph = dag.mapVertex { processor => val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) updatedProcessor }.mapEdge { (node1, edge, node2) => - PartitionerDescription(new PartitionerObject(Option(edge).getOrElse(StreamApplication.hashPartitioner))) + PartitionerDescription(new PartitionerObject( + Option(edge).getOrElse(StreamApplication.hashPartitioner))) } new StreamApplication(name, userConfig, graph) }
