This is an automated email from the ASF dual-hosted git repository.

ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e172168  [New Scheduler] Implement FPCInvokerReactive (#5125)
e172168 is described below

commit e172168fc5a55ba0c8443adbc91629291a1ca321
Author: ningyougang <[email protected]>
AuthorDate: Thu Jan 13 16:17:21 2022 +0800

    [New Scheduler] Implement FPCInvokerReactive (#5125)
    
    * Implement FPCInvokerReactive
    
    * Fix review points
    
    * Remove unnecessary code
---
 ansible/group_vars/all                             |   2 +
 .../org/apache/openwhisk/common/Logging.scala      |   2 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   2 +
 .../openwhisk/core/ack/HealthActionAck.scala       |  45 ++
 .../apache/openwhisk/core/entity/Identity.scala    |   6 +-
 core/invoker/src/main/resources/application.conf   |   1 +
 .../core/containerpool/ContainerProxy.scala        |   4 +-
 .../core/invoker/FPCInvokerReactive.scala          | 481 +++++++++++++++++++++
 .../apache/openwhisk/core/invoker/Invoker.scala    |   1 +
 .../openwhisk/core/invoker/InvokerReactive.scala   |   4 +
 tests/src/test/resources/application.conf.j2       |   4 +-
 .../test/FunctionPullingContainerProxyTests.scala  |   2 +-
 .../invoker/test/DefaultInvokerServerTests.scala   |   4 +
 .../core/invoker/test/FPCInvokerServerTests.scala  |   4 +
 14 files changed, 557 insertions(+), 5 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index bdd1bec..0f9b107 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -465,6 +465,8 @@ etcd_connect_string: "{% set ret = [] %}\
 
 scheduler:
   protocol: "{{ scheduler_protocol | default('http') }}"
+  grpc:
+    tls: "{{ scheduler_grpc_tls | default(false) }}"
   maxPeek: "{{ scheduler_max_peek | default(128) }}"
   queueManager:
     maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') 
}}"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index e1859de..5bbf43a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -510,6 +510,8 @@ object LoggingMarkers {
         "initiator" -> invocationNamespace,
         "namespace" -> namespace,
         "action" -> action))(MeasurementUnit.none)
+  def INVOKER_CONTAINER_CREATE(action: String, state: String) =
+    LogMarkerToken(invoker, "creation", counter, None, Map("action" -> action, 
"state" -> state))(MeasurementUnit.none)
   val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth", 
start)(MeasurementUnit.time.milliseconds)
   val INVOKER_CONTAINER_HEALTH_FAILED_WARM =
     LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"), 
Map("containerState" -> "warm"))(
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 484da2e..e50fbb4 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -243,6 +243,7 @@ object ConfigKeys {
   val runtimesRegistry = s"$containerFactory.runtimes-registry"
   val userImagesRegistry = s"$containerFactory.user-images-registry"
   val containerPool = "whisk.container-pool"
+  val containerCreationMaxPeek = "whisk.invoker.container-creation.max-peek"
   val blacklist = "whisk.blacklist"
 
   val kubernetes = "whisk.kubernetes"
@@ -294,6 +295,7 @@ object ConfigKeys {
 
   val azBlob = "whisk.azure-blob"
 
+  val schedulerGrpcService = "whisk.scheduler.grpc"
   val schedulerMaxPeek = "whisk.scheduler.max-peek"
   val schedulerQueue = "whisk.scheduler.queue"
   val schedulerQueueManager = "whisk.scheduler.queue-manager"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala
new file mode 100644
index 0000000..83c4acd
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.ack
+
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, 
MessageProducer}
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, 
WhiskActivation}
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class HealthActionAck(producer: MessageProducer)(implicit logging: Logging, 
ec: ExecutionContext) extends ActiveAck {
+  override def apply(tid: TransactionId,
+                     activationResult: WhiskActivation,
+                     blockingInvoke: Boolean,
+                     controllerInstance: ControllerInstanceId,
+                     userId: UUID,
+                     acknowledegment: AcknowledegmentMessage): Future[Any] = {
+    implicit val transid: TransactionId = tid
+
+    logging.debug(this, s"health action was successfully invoked")
+    if (activationResult.response.isContainerError || 
activationResult.response.isWhiskError) {
+      val actionPath =
+        
activationResult.annotations.getAs[String](WhiskActivation.pathAnnotation).getOrElse("unknown_path")
+      logging.error(this, s"Failed to invoke action $actionPath, error: 
${activationResult.response.toString}")
+    }
+
+    Future.successful({})
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
index 57b9f31..b05ae70 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
@@ -35,12 +35,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = 
None,
                       concurrentInvocations: Option[Int] = None,
                       firesPerMinute: Option[Int] = None,
                       allowedKinds: Option[Set[String]] = None,
-                      storeActivations: Option[Boolean] = None)
+                      storeActivations: Option[Boolean] = None,
+                      warmedContainerKeepingCount: Option[Int] = None,
+                      warmedContainerKeepingTimeout: Option[String] = None)
 
 object UserLimits extends DefaultJsonProtocol {
   val standardUserLimits = UserLimits()
 
-  implicit val serdes = jsonFormat5(UserLimits.apply)
+  implicit val serdes = jsonFormat7(UserLimits.apply)
 }
 
 protected[core] case class Namespace(name: EntityName, uuid: UUID)
diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index 9aeea13..2dc80d4 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -155,6 +155,7 @@ whisk {
       #aka 'How long should a container sit idle until we kill it?'
       idle-container = 10 minutes
       pause-grace = 50 milliseconds
+      keeping-duration = 60 minutes
     }
     action-health-check {
       enabled = false # if true, prewarm containers will be pinged 
periodically and warm containers will be pinged once after resumed
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index f50f6ca..1f8476c 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -961,7 +961,9 @@ class ContainerProxy(factory: (TransactionId,
   }
 }
 
-final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, 
pauseGrace: FiniteDuration)
+final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
+                                             pauseGrace: FiniteDuration,
+                                             keepingDuration: FiniteDuration)
 final case class ContainerProxyHealthCheckConfig(enabled: Boolean, 
checkPeriod: FiniteDuration, maxFails: Int)
 final case class ContainerProxyActivationErrorLogConfig(applicationErrors: 
Boolean,
                                                         developerErrors: 
Boolean,
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
new file mode 100644
index 0000000..8a0fdc4
--- /dev/null
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker
+
+import akka.Done
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, 
CoordinatedShutdown, Props}
+import akka.grpc.GrpcClientSettings
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import com.ibm.etcd.api.Event.EventType
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.WatchUpdate
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ack.{ActiveAck, HealthActionAck, 
MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool._
+import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.database.{UserContext, _}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
+import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, 
SchedulerStates}
+import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, 
LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest}
+import org.apache.openwhisk.spi.SpiLoader
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success, Try}
+
+case class GrpcServiceConfig(tls: Boolean)
+
+object FPCInvokerReactive extends InvokerProvider {
+
+  override def instance(
+    config: WhiskConfig,
+    instance: InvokerInstanceId,
+    producer: MessageProducer,
+    poolConfig: ContainerPoolConfig,
+    limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, 
logging: Logging): InvokerCore =
+    new FPCInvokerReactive(config, instance, producer, poolConfig, 
limitsConfig)
+}
+
+class FPCInvokerReactive(config: WhiskConfig,
+                         instance: InvokerInstanceId,
+                         producer: MessageProducer,
+                         poolConfig: ContainerPoolConfig =
+                           
loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
+                         limitsConfig: ConcurrencyLimitConfig = 
loadConfigOrThrow[ConcurrencyLimitConfig](
+                           ConfigKeys.concurrencyLimit))(implicit actorSystem: 
ActorSystem, logging: Logging)
+    extends InvokerCore {
+
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val exe: ExecutionContextExecutor = actorSystem.dispatcher
+  implicit val cfg: WhiskConfig = config
+
+  private val logsProvider = 
SpiLoader.get[LogStoreProvider].instance(actorSystem)
+  logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
+
+  private val etcdClient = 
EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+
+  private val grpcConfig = 
loadConfigOrThrow[GrpcServiceConfig](ConfigKeys.schedulerGrpcService)
+
+  val watcherService: ActorRef = 
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+  private val leaseService =
+    actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, instance, 
watcherService))
+
+  private val etcdWorkerFactory =
+    (f: ActorRefFactory) => f.actorOf(EtcdWorker.props(etcdClient, 
leaseService))
+
+  val dataManagementService: ActorRef =
+    actorSystem.actorOf(DataManagementService.props(watcherService, 
etcdWorkerFactory))
+
+  private val warmedSchedulers = TrieMap[String, String]()
+  private var warmUpWatcher: Option[Watch] = None
+
+  /**
+   * Factory used by the ContainerProxy to physically create a new container.
+   *
+   * Create and initialize the container factory before kicking off any other
+   * task or actor because further operation does not make sense if something
+   * goes wrong here. Initialization will throw an exception upon failure.
+   */
+  private val containerFactory =
+    SpiLoader
+      .get[ContainerFactoryProvider]
+      .instance(
+        actorSystem,
+        logging,
+        config,
+        instance,
+        Map(
+          "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+          "--ulimit" -> Set("nofile=1024:1024"),
+          "--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters)
+  containerFactory.init()
+
+  CoordinatedShutdown(actorSystem)
+    .addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "cleanup 
runtime containers") { () =>
+      containerFactory.cleanup()
+      Future.successful(Done)
+    }
+
+  /** Initialize needed databases */
+  private val entityStore = WhiskEntityStore.datastore()
+  private val activationStore =
+    SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
+
+  private val authStore = WhiskAuthStore.datastore()
+
+  private val namespaceBlacklist: NamespaceBlacklist = new 
NamespaceBlacklist(authStore)
+
+  
Scheduler.scheduleWaitAtMost(loadConfigOrThrow[NamespaceBlacklistConfig](ConfigKeys.blacklist).pollInterval)
 { () =>
+    logging.debug(this, "running background job to update blacklist")
+    namespaceBlacklist.refreshBlacklist()(ec, TransactionId.invoker).andThen {
+      case Success(set) => logging.info(this, s"updated blacklist to 
${set.size} entries")
+      case Failure(t)   => logging.error(this, s"error on updating the 
blacklist: ${t.getMessage}")
+    }
+  }
+
+  val containerProxyTimeoutConfig = 
loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts)
+
+  private def getWarmedContainerLimit(invocationNamespace: String): 
Future[(Int, FiniteDuration)] = {
+    implicit val trasnid = TransactionId.unknown
+    Identity
+      .get(authStore, EntityName(invocationNamespace))(trasnid)
+      .map { identity =>
+        val warmedContainerKeepingCount = 
identity.limits.warmedContainerKeepingCount.getOrElse(1)
+        val warmedContainerKeepingTimeout = Try {
+          
identity.limits.warmedContainerKeepingTimeout.map(Duration(_).toSeconds.seconds).get
+        }.getOrElse(containerProxyTimeoutConfig.keepingDuration)
+        (warmedContainerKeepingCount, warmedContainerKeepingTimeout)
+      }
+      .andThen {
+        case Failure(_: NoDocumentException) =>
+          logging.warn(this, s"namespace does not exist: 
$invocationNamespace")(trasnid)
+        case Failure(_: IllegalStateException) =>
+          logging.warn(this, s"namespace is not unique: 
$invocationNamespace")(trasnid)
+      }
+  }
+
+  private val ack = {
+    val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) 
else None
+    new MessagingActiveAck(producer, instance, sender)
+  }
+
+  // we don't need to store health action results in normal case
+  private val healthActionAck: ActiveAck = new HealthActionAck(producer)
+
+  private val collectLogs = new LogStoreCollector(logsProvider)
+
+  /** Stores an activation in the database. */
+  private val store = (tid: TransactionId, activation: WhiskActivation, 
isBlocking: Boolean, context: UserContext) => {
+    implicit val transid: TransactionId = tid
+    activationStore.storeAfterCheck(activation, isBlocking, None, 
context)(tid, notifier = None, logging)
+  }
+
+  private def healthActivationClientFactory(f: ActorRefFactory,
+                                            invocationNamespace: String,
+                                            fqn: FullyQualifiedEntityName,
+                                            rev: DocRevision,
+                                            schedulerHost: String,
+                                            rpcPort: Int,
+                                            containerId: ContainerId): 
ActorRef =
+    f.actorOf(Props(HealthActivationServiceClient()))
+
+  private def healthContainerProxyFactory(f: ActorRefFactory, healthManger: 
ActorRef): ActorRef = {
+    implicit val transId = TransactionId.invokerNanny
+    f.actorOf(
+      FunctionPullingContainerProxy
+        .props(
+          containerFactory.createContainer,
+          entityStore,
+          namespaceBlacklist,
+          WhiskAction.get,
+          dataManagementService,
+          healthActivationClientFactory,
+          healthActionAck,
+          store,
+          collectLogs,
+          getLiveContainerCount,
+          getWarmedContainerLimit,
+          instance,
+          healthManger,
+          poolConfig,
+          containerProxyTimeoutConfig))
+  }
+
+  private val invokerHealthManager =
+    actorSystem.actorOf(
+      InvokerHealthManager.props(instance, healthContainerProxyFactory, 
dataManagementService, entityStore))
+
+  invokerHealthManager ! Enable
+
+  private def activationClientFactory(etcd: EtcdClient)(
+    invocationNamespace: String,
+    fqn: FullyQualifiedEntityName,
+    schedulerHost: String,
+    rpcPort: Int,
+    tryOtherScheduler: Boolean = false): Future[ActivationServiceClient] = {
+
+    if (!tryOtherScheduler) {
+      val setting =
+        GrpcClientSettings
+          .connectToServiceAt(schedulerHost, rpcPort)
+          .withTls(grpcConfig.tls)
+      Future {
+        ActivationServiceClient(setting)
+      }.andThen {
+        case Failure(t) =>
+          logging.error(
+            this,
+            s"unable to create activation client for action ${fqn}: ${t} on 
original scheduler: ${schedulerHost}:${rpcPort}")
+      }
+    } else {
+      val leaderKey = queue(invocationNamespace, fqn, leader = true)
+      etcd
+        .get(leaderKey)
+        .flatMap { res =>
+          require(!res.getKvsList.isEmpty)
+
+          val endpoint: String = res.getKvsList.get(0).getValue
+          Future(SchedulerEndpoints.parse(endpoint))
+            .flatMap(Future.fromTry)
+            .map { schedulerEndpoint =>
+              val setting =
+                GrpcClientSettings
+                  .connectToServiceAt(schedulerEndpoint.host, 
schedulerEndpoint.rpcPort)
+                  .withTls(grpcConfig.tls)
+
+              ActivationServiceClient(setting)
+            }
+            .andThen {
+              case Failure(t) =>
+                logging.error(this, s"unable to create activation client for 
action ${fqn}: ${t}")
+            }
+        }
+    }
+
+  }
+
+  private def sendAckToScheduler(schedulerInstanceId: SchedulerInstanceId,
+                                 creationAckMessage: 
ContainerCreationAckMessage): Future[RecordMetadata] = {
+    val topic = 
s"${Invoker.topicPrefix}creationAck${schedulerInstanceId.asString}"
+    val reschedulable =
+      
creationAckMessage.error.map(ContainerCreationError.whiskErrors.contains(_)).getOrElse(false)
+    if (reschedulable) {
+      MetricEmitter.emitCounterMetric(
+        
LoggingMarkers.INVOKER_CONTAINER_CREATE(creationAckMessage.action.toString, 
"reschedule"))
+    } else if (creationAckMessage.error.nonEmpty) {
+      MetricEmitter.emitCounterMetric(
+        
LoggingMarkers.INVOKER_CONTAINER_CREATE(creationAckMessage.action.toString, 
"failed"))
+    }
+
+    producer.send(topic, creationAckMessage).andThen {
+      case Success(_) =>
+        logging.info(
+          this,
+          s"Posted ${if (reschedulable) "rescheduling"
+          else if (creationAckMessage.error.nonEmpty) "failed"
+          else "success"} ack of container creation 
${creationAckMessage.creationId} for 
${creationAckMessage.invocationNamespace}/${creationAckMessage.action}")
+      case Failure(t) =>
+        logging.error(
+          this,
+          s"failed to send container creation ack 
message(${creationAckMessage.creationId}) for 
${creationAckMessage.invocationNamespace}/${creationAckMessage.action} to 
scheduler: ${t.getMessage}")
+    }
+  }
+
+  /** Creates a ContainerProxy Actor when being called. */
+  private val childFactory = (f: ActorRefFactory) => {
+    implicit val transId = TransactionId.invokerNanny
+    f.actorOf(
+      FunctionPullingContainerProxy
+        .props(
+          containerFactory.createContainer,
+          entityStore,
+          namespaceBlacklist,
+          WhiskAction.get,
+          dataManagementService,
+          clientProxyFactory,
+          ack,
+          store,
+          collectLogs,
+          getLiveContainerCount,
+          getWarmedContainerLimit,
+          instance,
+          invokerHealthManager,
+          poolConfig,
+          containerProxyTimeoutConfig))
+  }
+
+  /** Creates a ActivationClientProxy Actor when being called. */
+  private def clientProxyFactory(f: ActorRefFactory,
+                                 invocationNamespace: String,
+                                 fqn: FullyQualifiedEntityName,
+                                 rev: DocRevision,
+                                 schedulerHost: String,
+                                 rpcPort: Int,
+                                 containerId: ContainerId): ActorRef = {
+    implicit val transId = TransactionId.invokerNanny
+    f.actorOf(
+      ActivationClientProxy
+        .props(invocationNamespace, fqn, rev, schedulerHost, rpcPort, 
containerId, activationClientFactory(etcdClient)))
+  }
+
+  val prewarmingConfigs: List[PrewarmingConfig] = {
+    ExecManifest.runtimesManifest.stemcells.flatMap {
+      case (mf, cells) =>
+        cells.map { cell =>
+          PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "", 
None), cell.memory)
+        }
+    }.toList
+  }
+
+  private val pool =
+    actorSystem.actorOf(
+      ContainerPoolV2
+        .props(childFactory, invokerHealthManager, poolConfig, instance, 
prewarmingConfigs, sendAckToScheduler))
+
+  private def getLiveContainerCount(invocationNamespace: String,
+                                    fqn: FullyQualifiedEntityName,
+                                    revision: DocRevision): Future[Long] = {
+    val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix, 
invocationNamespace, fqn, Some(revision))
+    val inProgressPrefix = containerPrefix(ContainerKeys.inProgressPrefix, 
invocationNamespace, fqn, Some(revision))
+    val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, 
invocationNamespace, fqn, Some(revision))
+    for {
+      namespaceCount <- etcdClient.getCount(namespacePrefix)
+      inProgressCount <- etcdClient.getCount(inProgressPrefix)
+      warmedCount <- etcdClient.getCount(warmedPrefix)
+    } yield {
+      namespaceCount + inProgressCount + warmedCount
+    }
+  }
+
+  /** Initialize message consumers */
+  private val msgProvider = SpiLoader.get[MessagingProvider]
+  //number of peeked messages - increasing the concurrentPeekFactor improves 
concurrent usage, but adds risk for message loss in case of crash
+  private val maxPeek = 
loadConfigOrThrow[Int](ConfigKeys.containerCreationMaxPeek)
+  private var consumer: Option[ContainerMessageConsumer] = Some(
+    new ContainerMessageConsumer(
+      instance,
+      pool,
+      entityStore,
+      cfg,
+      msgProvider,
+      longPollDuration = 1.second,
+      maxPeek,
+      sendAckToScheduler))
+
+  override def enable(): Route = {
+    invokerHealthManager ! Enable
+    pool ! Enable
+    // re-enable consumer
+    if (consumer.isEmpty)
+      consumer = Some(
+        new ContainerMessageConsumer(
+          instance,
+          pool,
+          entityStore,
+          cfg,
+          msgProvider,
+          longPollDuration = 1.second,
+          maxPeek,
+          sendAckToScheduler))
+    warmUp()
+    complete("Success enable invoker")
+  }
+
+  override def disable(): Route = {
+    invokerHealthManager ! GracefulShutdown
+    pool ! GracefulShutdown
+    consumer.foreach(_.close())
+    consumer = None
+    warmUpWatcher.foreach(_.close())
+    warmUpWatcher = None
+    complete("Successfully disabled invoker")
+  }
+
+  override def backfillPrewarm(): Route = {
+    pool ! AdjustPrewarmedContainer
+    complete("backfilling prewarm container")
+  }
+
+  private val warmUpFetchRequest = FetchRequest(
+    TransactionId(TransactionId.generateTid()).serialize,
+    InvokerHealthManager.healthActionIdentity.namespace.name.asString,
+    WarmUp.warmUpAction.serialize,
+    DocRevision.empty.serialize) // a warm up fetch request which contains 
nothing
+
+  // warm up grpc connection with scheduler
+  private def warmUpScheduler(scheduler: SchedulerEndpoints) = {
+    val setting =
+      GrpcClientSettings
+        .connectToServiceAt(scheduler.host, scheduler.rpcPort)
+        .withTls(grpcConfig.tls)
+    val client = ActivationServiceClient(setting)
+    client.fetchActivation(warmUpFetchRequest).andThen {
+      case _ =>
+        logging.info(this, s"Warmed up scheduler $scheduler")
+        client.close()
+    }
+  }
+
+  private def warmUp(): Unit = {
+    implicit val transId = TransactionId.warmUp
+    if (warmUpWatcher.isEmpty)
+      warmUpWatcher = Some(etcdClient.watch(SchedulerKeys.prefix, true) { res: 
WatchUpdate =>
+        res.getEvents.asScala.foreach {
+          event =>
+            event.getType match {
+              case EventType.DELETE =>
+                val key = event.getPrevKv.getKey
+                warmedSchedulers.remove(key)
+              case EventType.PUT =>
+                val key = event.getKv.getKey
+                val value = event.getKv.getValue
+                SchedulerStates
+                  .parse(value)
+                  .map { state =>
+                    // warm up new scheduler
+                    warmedSchedulers.getOrElseUpdate(key, {
+                      logging.info(this, s"Warm up scheduler ${state.sid}")
+                      warmUpScheduler(state.endpoints)
+                      value
+                    })
+                  }
+              case _ =>
+            }
+        }
+      })
+
+    etcdClient.getPrefix(SchedulerKeys.prefix).map { res =>
+      res.getKvsList.asScala.map { kv =>
+        val scheduler = kv.getKey
+        warmedSchedulers.getOrElseUpdate(
+          scheduler, {
+            logging.info(this, s"Warm up scheduler $scheduler")
+            SchedulerStates
+              .parse(kv.getValue)
+              .map { state =>
+                warmUpScheduler(state.endpoints)
+              }
+              .recover {
+                case t =>
+                  logging.error(this, s"Unexpected error $t")
+              }
+
+            kv.getValue
+          })
+
+      }
+    }
+  }
+  warmUp()
+}
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 437beb3..6de751f 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -220,6 +220,7 @@ trait InvokerProvider extends Spi {
 trait InvokerCore {
   def enable(): Route
   def disable(): Route
+  def backfillPrewarm(): Route
 }
 
 /**
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 3177cb7..f2d36b1 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -307,4 +307,8 @@ class InvokerReactive(
     complete("not supported")
   }
 
+  override def backfillPrewarm(): Route = {
+    complete("not supported")
+  }
+
 }
diff --git a/tests/src/test/resources/application.conf.j2 
b/tests/src/test/resources/application.conf.j2
index 6b69172..352fa16 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -140,13 +140,15 @@ whisk {
 
     scheduler {
         protocol = "{{ scheduler.protocol }}"
+        grpc {
+            tls = "{{ scheduler.grpc.tls | default('false') | lower }}"
+        }
         queue-manager {
             max-scheduling-time     = "{{ 
scheduler.queueManager.maxSchedulingTime }}"
             max-retries-to-get-queue    = "{{ 
scheduler.queueManager.maxRetriesToGetQueue }}"
         }
         max-peek = "{{ scheduler.maxPeek }}"
     }
-
 }
 
 #test-only overrides so that tests can override defaults in application.conf 
(todo: move all defaults to reference.conf)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 9b91d55..c779320 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -117,7 +117,7 @@ class FunctionPullingContainerProxyTests
       false,
       1.second)
 
-  val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds)
+  val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 
5.seconds)
 
   val messageTransId = TransactionId(TransactionId.testing.meta.id)
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 153e228..1095128 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -130,6 +130,10 @@ class TestInvokerReactive extends InvokerCore with 
BasicHttpService {
     complete("")
   }
 
+  override def backfillPrewarm(): Route = {
+    complete("")
+  }
+
   def reset(): Unit = {
     enableCount = 0
     disableCount = 0
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index 8cf8d91..e387cd6 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -129,6 +129,10 @@ class TestFPCInvokerReactive extends InvokerCore with 
BasicHttpService {
     complete("")
   }
 
+  override def backfillPrewarm(): Route = {
+    complete("")
+  }
+
   def reset(): Unit = {
     enableCount = 0
     disableCount = 0

Reply via email to