This is an automated email from the ASF dual-hosted git repository.

ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1753946  Implement FCPSchedulerServer (#5030)
1753946 is described below

commit 1753946ac16b91b2d2a3fc55ab215b14e71c2b39
Author: ningyougang <[email protected]>
AuthorDate: Tue Feb 2 16:15:23 2021 +0800

    Implement FCPSchedulerServer (#5030)
---
 ...edulerServer.scala => FPCSchedulerServer.scala} |  43 ++++--
 .../openwhisk/core/scheduler/Scheduler.scala       |  15 +-
 tests/build.gradle                                 |   3 +
 .../core/scheduler/FPCSchedulerServerTests.scala   | 153 +++++++++++++++++++++
 4 files changed, 192 insertions(+), 22 deletions(-)

diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
similarity index 64%
rename from 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
rename to 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
index 841b139..874362f 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala
@@ -18,12 +18,14 @@
 package org.apache.openwhisk.core.scheduler
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.StatusCodes
 import akka.http.scaladsl.model.headers.BasicHttpCredentials
 import akka.http.scaladsl.server.Route
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.http.BasicRasService
 import org.apache.openwhisk.http.ErrorResponse.terminate
+import spray.json.DefaultJsonProtocol._
 import spray.json._
 
 import scala.concurrent.ExecutionContext
@@ -32,7 +34,7 @@ import scala.concurrent.ExecutionContext
  * Implements web server to handle certain REST API calls.
  * Currently provides a health ping route, only.
  */
-class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, 
systemPassword: String)(
+class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, 
systemPassword: String)(
   implicit val ec: ExecutionContext,
   implicit val actorSystem: ActorSystem,
   implicit val logger: Logging)
@@ -41,10 +43,28 @@ class SchedulerServer(scheduler: SchedulerCore, 
systemUsername: String, systemPa
   override def routes(implicit transid: TransactionId): Route = {
     super.routes ~ extractCredentials {
       case Some(BasicHttpCredentials(username, password)) if username == 
systemUsername && password == systemPassword =>
-        (path("disable") & post) {
+        (path("state") & get) {
+          complete {
+            scheduler.getState.map {
+              case (list, creationCount) =>
+                (list
+                  .map(scheduler => scheduler._1.asString -> 
scheduler._2.toString)
+                  .toMap
+                  ++ Map("creationCount" -> 
creationCount.toString)).toJson.asJsObject
+            }
+          }
+        } ~ (path("disable") & post) {
           logger.warn(this, "Scheduler is disabled")
           scheduler.disable()
           complete("scheduler disabled")
+        } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) {
+          complete {
+            scheduler.getQueueSize.map(_.toString)
+          }
+        } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) {
+          complete {
+            scheduler.getQueueStatusData.map(s => s.toJson)
+          }
         }
       case _ =>
         implicit val jsonPrettyResponsePrinter = PrettyPrinter
@@ -53,21 +73,16 @@ class SchedulerServer(scheduler: SchedulerCore, 
systemUsername: String, systemPa
   }
 }
 
-object SchedulerServer {
+object FPCSchedulerServer {
 
-  val schedulerUsername = {
-    val source = scala.io.Source.fromFile("/conf/schedulerauth.username")
-    try source.mkString.replaceAll("\r|\n", "")
-    finally source.close()
-  }
-  val schedulerPassword = {
-    val source = scala.io.Source.fromFile("/conf/schedulerauth.password")
-    try source.mkString.replaceAll("\r|\n", "")
-    finally source.close()
-  }
+  // TODO: TBD, after FPCScheduler is ready, can read the credentials from 
pureconfig
+  val schedulerUsername = "admin"
+  val schedulerPassword = "admin"
+
+  val queuePathPrefix = "queue"
 
   def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext,
                                          actorSystem: ActorSystem,
                                          logger: Logging): BasicRasService =
-    new SchedulerServer(scheduler, schedulerUsername, schedulerPassword)
+    new FPCSchedulerServer(scheduler, schedulerUsername, schedulerPassword)
 }
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 9fc793b..6bb4311 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -81,21 +81,20 @@ class Scheduler(schedulerId: SchedulerInstanceId, 
schedulerEndpoints: SchedulerE
   val durationChecker = "" // TODO: TBD
 
   override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
-    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after 
etcdClient is ready, can implement it
   }
 
   override def getQueueSize: Future[Int] = {
-    Future.successful(0) // TODO: TBD
+    Future.successful(0) // TODO: TBD, after queueManager is ready, can 
implement it
   }
 
-  override def getQueueStatusData: Future[List[String]] = {
-    Future.successful(List("")) // TODO: TBD
+  override def getQueueStatusData: Future[List[StatusData]] = {
+    Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // 
TODO: TBD, after queueManager is ready, can implement it
   }
 
-  // other components don't need to shutdown gracefully
   override def disable(): Unit = {
     logging.info(this, s"Gracefully shutting down the scheduler")
-    // TODO: TBD, gracefully shut down the container manager and queue manager
+    // TODO: TBD, after containerManager and queueManager are ready, can 
implement it
   }
 
   private def getUserLimit(invocationNamespace: String): Future[Int] = {
@@ -160,7 +159,7 @@ trait SchedulerCore {
 
   def getQueueSize: Future[Int]
 
-  def getQueueStatusData: Future[List[String]] // TODO: Change to the real 
data class other than just string
+  def getQueueStatusData: Future[List[StatusData]]
 
   def disable(): Unit
 }
@@ -253,7 +252,7 @@ object Scheduler {
         val httpsConfig =
           if (Scheduler.protocol == "https") 
Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None
 
-        
BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, 
port, httpsConfig)(
+        
BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, 
port, httpsConfig)(
           actorSystem,
           ActorMaterializer.create(actorSystem))
 
diff --git a/tests/build.gradle b/tests/build.gradle
index 57c3bd7..49fac12 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -325,6 +325,7 @@ task copyMeasurementFiles() {
     doLast{
         Project common = project(":common:scala")
         Project controller = project(":core:controller")
+        Project scheduler = project(":core:scheduler")
         Project invoker = project(":core:invoker")
 
         Properties wskProps = loadWhiskProps()
@@ -335,6 +336,8 @@ task copyMeasurementFiles() {
 
         copyAndRenameMeasurementFile(covLogs, 'controller', "common", common)
         copyAndRenameMeasurementFile(covLogs, 'controller', "controller", 
controller)
+        copyAndRenameMeasurementFile(covLogs, 'scheduler', "common", common)
+        copyAndRenameMeasurementFile(covLogs, 'scheduler', "scheduler", 
scheduler)
         copyAndRenameMeasurementFile(covLogs, 'invoker', "common", common)
         copyAndRenameMeasurementFile(covLogs, 'invoker', "invoker", invoker)
     }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
new file mode 100644
index 0000000..0dab4f4
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.scheduler
+
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import common.StreamLogging
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.StatusData
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, 
Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+
+import scala.concurrent.Future
+
+/**
+ * Tests SchedulerServer API.
+ */
+@RunWith(classOf[JUnitRunner])
+class FPCSchedulerServerTests
+    extends FlatSpec
+    with BeforeAndAfterEach
+    with BeforeAndAfterAll
+    with ScalatestRouteTest
+    with Matchers
+    with StreamLogging
+    with MockFactory {
+
+  def transid() = TransactionId("tid")
+
+  val systemUsername = "username"
+  val systemPassword = "password"
+
+  val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 
3))
+  val creationCount = 1
+  val testQueueSize = 2
+  val statusDatas = List(
+    StatusData("testns1", "testaction1", 10, "Running", "RunningData"),
+    StatusData("testns2", "testaction2", 5, "Running", "RunningData"))
+
+  // Create scheduler
+  val scheduler = new TestScheduler(queues, creationCount, testQueueSize, 
statusDatas)
+  val server = new FPCSchedulerServer(scheduler, systemUsername, 
systemPassword)
+
+  override protected def afterEach(): Unit = scheduler.reset()
+
+  /** FPCSchedulerServer API tests */
+  behavior of "FPCSchedulerServer API"
+
+  // POST /disable
+  it should "disable scheduler" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Post(s"/disable") ~> addCredentials(validCredentials) ~> 
Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      scheduler.shutdownCount shouldBe 1
+    }
+  }
+
+  // GET /state
+  it should "get scheduler state" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/state") ~> addCredentials(validCredentials) ~> 
Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> 
s._2.toString).toMap ++ Map(
+        "creationCount" -> creationCount.toString)).toJson
+    }
+  }
+
+  // GET /queue/total
+  it should "get total queue" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/queue/total") ~> addCredentials(validCredentials) ~> 
Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      responseAs[String] shouldBe testQueueSize.toString
+    }
+  }
+
+  // GET /queue/status
+  it should "get all queue status" in {
+    implicit val tid = transid()
+    val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
+    Get(s"/queue/status") ~> addCredentials(validCredentials) ~> 
Route.seal(server.routes(tid)) ~> check {
+      status should be(OK)
+      responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson)
+    }
+  }
+
+  // POST /disable with invalid credential
+  it should "not call scheduler api with invalid credential" in {
+    implicit val tid = transid()
+    val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
+    Post(s"/disable") ~> addCredentials(invalidCredentials) ~> 
Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      scheduler.shutdownCount shouldBe 0
+    }
+  }
+
+  // POST /disable with empty credential
+  it should "not call scheduler api with empty credential" in {
+    implicit val tid = transid()
+    Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
+      status should be(Unauthorized)
+      scheduler.shutdownCount shouldBe 0
+    }
+  }
+
+}
+
+class TestScheduler(schedulerStates: List[(SchedulerInstanceId, Int)],
+                    creationCount: Int,
+                    queueSize: Int,
+                    statusDatas: List[StatusData])
+    extends SchedulerCore {
+  var shutdownCount = 0
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] =
+    Future.successful(schedulerStates, creationCount)
+
+  override def getQueueSize: Future[Int] = Future.successful(queueSize)
+
+  override def getQueueStatusData: Future[List[StatusData]] = 
Future.successful(statusDatas)
+
+  override def disable(): Unit = shutdownCount += 1
+
+  def reset(): Unit = {
+    shutdownCount = 0
+  }
+}

Reply via email to