style95 commented on a change in pull request #4983:
URL: https://github.com/apache/openwhisk/pull/4983#discussion_r494661944



##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD

Review comment:
       This is a factory to create memory queues.
   In the new architecture, each action is given its own dedicated queue.
   

##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD

Review comment:
       This is one of the major components which take charge of managing queues 
and coordinating requests among the scheduler, controllers, and invokers.

##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD

Review comment:
       This component is in charge of storing data to ETCD.
   

##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = 
ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = 
None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real 
data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = 
loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,

Review comment:
       The scheduler has two ports, one for akka-remote and the other for 
akka-grpc.
   

##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD
+
+  val memoryQueueFactory = "" // TODO: TBD
+
+  val schedulerConsumer = msgProvider.getConsumer(
+    config,
+    s"scheduler${schedulerId.asString}",
+    s"scheduler${schedulerId.asString}",
+    500, // TODO: to be updated with maxPeek variable
+    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+
+  implicit val trasnid = TransactionId.containerCreation
+
+  val queueManager = "" // TODO: TBD
+
+  //val serviceHandlers: HttpRequest => Future[HttpResponse] = 
ActivationServiceHandler.apply(ActivationServiceImpl())  TODO: TBD
+}
+
+case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = 
None, displayedName: Option[String] = None)
+
+trait SchedulerCore {
+  def getState: Future[(List[(SchedulerInstanceId, Int)], Int)]
+
+  def getQueueSize: Future[Int]
+
+  def getQueueStatusData: Future[List[String]] // TODO: Change to the real 
data class other than just string
+
+  def shutdown(): Unit
+}
+
+object Scheduler {
+
+  protected val protocol = 
loadConfigOrThrow[String]("whisk.scheduler.protocol")
+
+  def requiredProperties =
+    Map(
+      servicePort -> 8080.toString,
+      schedulerHost -> null,
+      schedulerAkkaPort -> null,
+      schedulerRpcPort -> null,
+      WhiskConfig.actionInvokePerMinuteLimit -> null,
+      WhiskConfig.actionInvokeConcurrentLimit -> null,
+      WhiskConfig.triggerFirePerMinuteLimit -> null) ++
+      kafkaHosts ++
+      zookeeperHosts ++
+      wskApiHost ++
+      ExecManifest.requiredProperties
+
+  def initKamon(instance: SchedulerInstanceId): Unit = {
+    // Replace the hostname of the scheduler to the assigned id of the 
scheduler.
+    val newKamonConfig = Kamon.config
+      .withValue("kamon.environment.host", 
ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}"))
+    Kamon.init(newKamonConfig)
+  }
+
+  def main(args: Array[String]): Unit = {
+    implicit val ec = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    implicit val actorSystem: ActorSystem =
+      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = 
Some(ec))
+    implicit val materializer = ActorMaterializer.create(actorSystem)
+
+    implicit val logger = new 
AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+
+    // Prepare Kamon shutdown
+    
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate,
 "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.stopModules().map(_ => Done)
+    }
+
+    // extract configuration data from the environment
+    implicit val config = new WhiskConfig(requiredProperties)
+    val port = config.servicePort.toInt
+    val host = config.schedulerHost
+    val rpcPort = config.schedulerRpcPort.toInt
+    val akkaPort = config.schedulerAkkaPort.toInt
+
+    // if deploying multiple instances (scale out), must pass the instance 
number as the
+    require(args.length >= 1, "controller instance required")
+    val instanceId = SchedulerInstanceId(args(0))
+
+    initKamon(instanceId)
+
+    def abort(message: String) = {
+      logger.error(this, message)
+      actorSystem.terminate()
+      Await.result(actorSystem.whenTerminated, 30.seconds)
+      sys.exit(1)
+    }
+
+    if (!config.isValid) {
+      abort("Bad configuration, cannot start.")
+    }
+
+    val execManifest = ExecManifest.initialize(config)
+    if (execManifest.isFailure) {
+      logger.error(this, s"Invalid runtimes manifest: 
${execManifest.failed.get}")
+      abort("Bad configuration, cannot start.")
+    }
+
+    val msgProvider = SpiLoader.get[MessagingProvider]
+
+    Seq(
+      ("scheduler" + instanceId.asString, "actions", 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      ("creationAck" + instanceId.asString, "creationAck", 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))

Review comment:
       The final goal is to remove Kafka from the critical path, but it still 
relies on Kafka as of now.
   Now activation messages are sent to the scheduler via `schedulerN` topic and 
container creation messages are sent to invoker via `invokerN` topic.
   

##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
##########
@@ -0,0 +1,269 @@
+package org.apache.openwhisk.core.scheduler
+import akka.Done
+import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import kamon.Kamon
+import org.apache.openwhisk.common.Https.HttpsConfig
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
+import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.BasicHttpService
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig.loadConfigOrThrow
+import spray.json.{DefaultJsonProtocol, _}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+import pureconfig.generic.auto._
+
+class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: 
SchedulerEndpoints)(
+  implicit config: WhiskConfig,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  logging: Logging)
+    extends SchedulerCore {
+  implicit val ec = actorSystem.dispatcher
+  private val authStore = WhiskAuthStore.datastore()
+
+  val msgProvider = SpiLoader.get[MessagingProvider]
+  val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  val maxPeek = "" // TODO: TBD
+  val etcdClient = "" // TODO: TBD
+  val watcherService = "" // TODO: TBD
+  val leaseService = "" // TODO: TBD
+
+  implicit val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, 
logging)
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, schedulerId, sender)
+  }
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.store(activation, context)(tid, notifier = None).andThen {
+      case Success(doc) => logging.info(this, s"save ${doc} successfully")
+      case Failure(t)   => logging.error(this, s"failed to save activation 
$activation, error: ${t.getMessage}")
+    }
+  }
+  val durationCheckerProvider = "" // TODO: TBD
+  val durationChecker = "" // TODO: TBD
+
+  override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
+    Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD
+  }
+
+  override def getQueueSize: Future[Int] = {
+    Future.successful(0) // TODO: TBD
+  }
+
+  override def getQueueStatusData: Future[List[String]] = {
+    Future.successful(List("")) // TODO: TBD
+  }
+
+  // other components don't need to shutdown gracefully
+  override def shutdown(): Unit = {
+    logging.info(this, s"Gracefully shutting down the scheduler")
+    // TODO: TBD, gracefully shut down the container manager and queue manager
+  }
+
+  private def getUserLimit(invocationNamespace: String): Future[Int] = {
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val limit = 
identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt)
+        logging.debug(this, s"limit for ${invocationNamespace}: 
${limit}")(trasnid)
+        limit
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val etcdWorkerFactory = "" // TODO: TBD
+
+  val dataManagementService = "" // TODO: TBD
+
+  val creationJobManagerFactory = "" // TODO: TBD
+
+  val containerManager = "" // TODO: TBD

Review comment:
       This component is responsible for creating containers for a given action.
   It relies on the `creationJobManager` to manage the container creation job.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to