This is an automated email from the ASF dual-hosted git repository.
ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 1753946 Implement FCPSchedulerServer (#5030)
1753946 is described below
commit 1753946ac16b91b2d2a3fc55ab215b14e71c2b39
Author: ningyougang <[email protected]>
AuthorDate: Tue Feb 2 16:15:23 2021 +0800
Implement FCPSchedulerServer (#5030)
---
...edulerServer.scala => FPCSchedulerServer.scala} | 43 ++++--
.../openwhisk/core/scheduler/Scheduler.scala | 15 +-
tests/build.gradle | 3 +
.../core/scheduler/FPCSchedulerServerTests.scala | 153 +++++++++++++++++++++
4 files changed, 192 insertions(+), 22 deletions(-)
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
similarity index 64%
rename from
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
rename to
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
index 841b139..874362f 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
@@ -18,12 +18,14 @@
package org.apache.openwhisk.core.scheduler
import akka.actor.ActorSystem
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.http.BasicRasService
import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.concurrent.ExecutionContext
@@ -32,7 +34,7 @@ import scala.concurrent.ExecutionContext
* Implements web server to handle certain REST API calls.
* Currently provides a health ping route, only.
*/
-class SchedulerServer(scheduler: SchedulerCore, systemUsername: String,
systemPassword: String)(
+class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String,
systemPassword: String)(
implicit val ec: ExecutionContext,
implicit val actorSystem: ActorSystem,
implicit val logger: Logging)
@@ -41,10 +43,28 @@ class SchedulerServer(scheduler: SchedulerCore,
systemUsername: String, systemPa
override def routes(implicit transid: TransactionId): Route = {
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password)) if username ==
systemUsername && password == systemPassword =>
- (path("disable") & post) {
+ (path("state") & get) {
+ complete {
+ scheduler.getState.map {
+ case (list, creationCount) =>
+ (list
+ .map(scheduler => scheduler._1.asString ->
scheduler._2.toString)
+ .toMap
+ ++ Map("creationCount" ->
creationCount.toString)).toJson.asJsObject
+ }
+ }
+ } ~ (path("disable") & post) {
logger.warn(this, "Scheduler is disabled")
scheduler.disable()
complete("scheduler disabled")
+ } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
+ complete {
+ scheduler.getQueueSize.map(_.toString)
+ }
+ } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
+ complete {
+ scheduler.getQueueStatusData.map(s => s.toJson)
+ }
}
case _ =>
implicit val jsonPrettyResponsePrinter = PrettyPrinter
@@ -53,21 +73,16 @@ class SchedulerServer(scheduler: SchedulerCore,
systemUsername: String, systemPa
}
}
-object SchedulerServer {
+object FPCSchedulerServer {
- val schedulerUsername = {
- val source = scala.io.Source.fromFile("/conf/schedulerauth.username")
- try source.mkString.replaceAll("\r|\n", "")
- finally source.close()
- }
- val schedulerPassword = {
- val source = scala.io.Source.fromFile("/conf/schedulerauth.password")
- try source.mkString.replaceAll("\r|\n", "")
- finally source.close()
- }
+ // TODO: TBD, after FPCScheduler is ready, can read the credentials from
pureconfig
+ val schedulerUsername = "admin"
+ val schedulerPassword = "admin"
+
+ val queuePathPrefix = "queue"
def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
actorSystem: ActorSystem,
logger: Logging): BasicRasService =
- new SchedulerServer(scheduler, schedulerUsername, schedulerPassword)
+ new FPCSchedulerServer(scheduler, schedulerUsername, schedulerPassword)
}
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 9fc793b..6bb4311 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -81,21 +81,20 @@ class Scheduler(schedulerId: SchedulerInstanceId,
schedulerEndpoints: SchedulerE
val durationChecker = "" // TODO: TBD
override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
- Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+ Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after
etcdClient is ready, can implement it
}
override def getQueueSize: Future[Int] = {
- Future.successful(0) // TODO: TBD
+ Future.successful(0) // TODO: TBD, after queueManager is ready, can
implement it
}
- override def getQueueStatusData: Future[List[String]] = {
- Future.successful(List("")) // TODO: TBD
+ override def getQueueStatusData: Future[List[StatusData]] = {
+ Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) //
TODO: TBD, after queueManager is ready, can implement it
}
- // other components don't need to shutdown gracefully
override def disable(): Unit = {
logging.info(this, s"Gracefully shutting down the scheduler")
- // TODO: TBD, gracefully shut down the container manager and queue manager
+ // TODO: TBD, after containerManager and queueManager are ready, can
implement it
}
private def getUserLimit(invocationNamespace: String): Future[Int] = {
@@ -160,7 +159,7 @@ trait SchedulerCore {
def getQueueSize: Future[Int]
- def getQueueStatusData: Future[List[String]] // TODO: Change to the real
data class other than just string
+ def getQueueStatusData: Future[List[StatusData]]
def disable(): Unit
}
@@ -253,7 +252,7 @@ object Scheduler {
val httpsConfig =
if (Scheduler.protocol == "https")
Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
-
BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route,
port, httpsConfig)(
+
BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route,
port, httpsConfig)(
actorSystem,
ActorMaterializer.create(actorSystem))
diff --git a/tests/build.gradle b/tests/build.gradle
index 57c3bd7..49fac12 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -325,6 +325,7 @@ task copyMeasurementFiles() {
doLast{
Project common = project(":common:scala")
Project controller = project(":core:controller")
+ Project scheduler = project(":core:scheduler")
Project invoker = project(":core:invoker")
Properties wskProps = loadWhiskProps()
@@ -335,6 +336,8 @@ task copyMeasurementFiles() {
copyAndRenameMeasurementFile(covLogs, 'controller', "common", common)
copyAndRenameMeasurementFile(covLogs, 'controller', "controller",
controller)
+ copyAndRenameMeasurementFile(covLogs, 'scheduler', "common", common)
+ copyAndRenameMeasurementFile(covLogs, 'scheduler', "scheduler",
scheduler)
copyAndRenameMeasurementFile(covLogs, 'invoker', "common", common)
copyAndRenameMeasurementFile(covLogs, 'invoker', "invoker", invoker)
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
new file mode 100644
index 0000000..0dab4f4
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.StatusData
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec,
Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+
+import scala.concurrent.Future
+
+/**
+ * Tests SchedulerServer API.
+ */
+@RunWith(classOf[JUnitRunner])
+class FPCSchedulerServerTests
+ extends FlatSpec
+ with BeforeAndAfterEach
+ with BeforeAndAfterAll
+ with ScalatestRouteTest
+ with Matchers
+ with StreamLogging
+ with MockFactory {
+
+ def transid() = TransactionId("tid")
+
+ val systemUsername = "username"
+ val systemPassword = "password"
+
+ val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"),
3))
+ val creationCount = 1
+ val testQueueSize = 2
+ val statusDatas = List(
+ StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
+ StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
+
+ // Create scheduler
+ val scheduler = new TestScheduler(queues, creationCount, testQueueSize,
statusDatas)
+ val server = new FPCSchedulerServer(scheduler, systemUsername,
systemPassword)
+
+ override protected def afterEach(): Unit = scheduler.reset()
+
+ /** FPCSchedulerServer API tests */
+ behavior of "FPCSchedulerServer API"
+
+ // POST /disable
+ it should "disable scheduler" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Post(s"/disable") ~> addCredentials(validCredentials) ~>
Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ scheduler.shutdownCount shouldBe 1
+ }
+ }
+
+ // GET /state
+ it should "get scheduler state" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/state") ~> addCredentials(validCredentials) ~>
Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ responseAs[JsObject] shouldBe (queues.map(s => s._1.asString ->
s._2.toString).toMap ++ Map(
+ "creationCount" -> creationCount.toString)).toJson
+ }
+ }
+
+ // GET /queue/total
+ it should "get total queue" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/queue/total") ~> addCredentials(validCredentials) ~>
Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ responseAs[String] shouldBe testQueueSize.toString
+ }
+ }
+
+ // GET /queue/status
+ it should "get all queue status" in {
+ implicit val tid = transid()
+ val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+ Get(s"/queue/status") ~> addCredentials(validCredentials) ~>
Route.seal(server.routes(tid)) ~> check {
+ status should be(OK)
+ responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
+ }
+ }
+
+ // POST /disable with invalid credential
+ it should "not call scheduler api with invalid credential" in {
+ implicit val tid = transid()
+ val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+ Post(s"/disable") ~> addCredentials(invalidCredentials) ~>
Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ scheduler.shutdownCount shouldBe 0
+ }
+ }
+
+ // POST /disable with empty credential
+ it should "not call scheduler api with empty credential" in {
+ implicit val tid = transid()
+ Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+ status should be(Unauthorized)
+ scheduler.shutdownCount shouldBe 0
+ }
+ }
+
+}
+
+class TestScheduler(schedulerStates: List[(SchedulerInstanceId, Int)],
+ creationCount: Int,
+ queueSize: Int,
+ statusDatas: List[StatusData])
+ extends SchedulerCore {
+ var shutdownCount = 0
+
+ override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] =
+ Future.successful(schedulerStates, creationCount)
+
+ override def getQueueSize: Future[Int] = Future.successful(queueSize)
+
+ override def getQueueStatusData: Future[List[StatusData]] =
Future.successful(statusDatas)
+
+ override def disable(): Unit = shutdownCount += 1
+
+ def reset(): Unit = {
+ shutdownCount = 0
+ }
+}