This is an automated email from the ASF dual-hosted git repository.
ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e172168 [New Scheduler] Implement FPCInvokerReactive (#5125)
e172168 is described below
commit e172168fc5a55ba0c8443adbc91629291a1ca321
Author: ningyougang <[email protected]>
AuthorDate: Thu Jan 13 16:17:21 2022 +0800
[New Scheduler] Implement FPCInvokerReactive (#5125)
* Implement FPCInvokerReactive
* Fix review points
* Remove unnecessary code
---
ansible/group_vars/all | 2 +
.../org/apache/openwhisk/common/Logging.scala | 2 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 2 +
.../openwhisk/core/ack/HealthActionAck.scala | 45 ++
.../apache/openwhisk/core/entity/Identity.scala | 6 +-
core/invoker/src/main/resources/application.conf | 1 +
.../core/containerpool/ContainerProxy.scala | 4 +-
.../core/invoker/FPCInvokerReactive.scala | 481 +++++++++++++++++++++
.../apache/openwhisk/core/invoker/Invoker.scala | 1 +
.../openwhisk/core/invoker/InvokerReactive.scala | 4 +
tests/src/test/resources/application.conf.j2 | 4 +-
.../test/FunctionPullingContainerProxyTests.scala | 2 +-
.../invoker/test/DefaultInvokerServerTests.scala | 4 +
.../core/invoker/test/FPCInvokerServerTests.scala | 4 +
14 files changed, 557 insertions(+), 5 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index bdd1bec..0f9b107 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -465,6 +465,8 @@ etcd_connect_string: "{% set ret = [] %}\
scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
+ grpc:
+ tls: "{{ scheduler_grpc_tls | default(false) }}"
maxPeek: "{{ scheduler_max_peek | default(128) }}"
queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second')
}}"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index e1859de..5bbf43a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -510,6 +510,8 @@ object LoggingMarkers {
"initiator" -> invocationNamespace,
"namespace" -> namespace,
"action" -> action))(MeasurementUnit.none)
+ def INVOKER_CONTAINER_CREATE(action: String, state: String) =
+ LogMarkerToken(invoker, "creation", counter, None, Map("action" -> action,
"state" -> state))(MeasurementUnit.none)
val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth",
start)(MeasurementUnit.time.milliseconds)
val INVOKER_CONTAINER_HEALTH_FAILED_WARM =
LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"),
Map("containerState" -> "warm"))(
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 484da2e..e50fbb4 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -243,6 +243,7 @@ object ConfigKeys {
val runtimesRegistry = s"$containerFactory.runtimes-registry"
val userImagesRegistry = s"$containerFactory.user-images-registry"
val containerPool = "whisk.container-pool"
+ val containerCreationMaxPeek = "whisk.invoker.container-creation.max-peek"
val blacklist = "whisk.blacklist"
val kubernetes = "whisk.kubernetes"
@@ -294,6 +295,7 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"
+ val schedulerGrpcService = "whisk.scheduler.grpc"
val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala
new file mode 100644
index 0000000..83c4acd
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.ack
+
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage,
MessageProducer}
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID,
WhiskActivation}
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class HealthActionAck(producer: MessageProducer)(implicit logging: Logging,
ec: ExecutionContext) extends ActiveAck {
+ override def apply(tid: TransactionId,
+ activationResult: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] = {
+ implicit val transid: TransactionId = tid
+
+ logging.debug(this, s"health action was successfully invoked")
+ if (activationResult.response.isContainerError ||
activationResult.response.isWhiskError) {
+ val actionPath =
+
activationResult.annotations.getAs[String](WhiskActivation.pathAnnotation).getOrElse("unknown_path")
+ logging.error(this, s"Failed to invoke action $actionPath, error:
${activationResult.response.toString}")
+ }
+
+ Future.successful({})
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
index 57b9f31..b05ae70 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala
@@ -35,12 +35,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] =
None,
concurrentInvocations: Option[Int] = None,
firesPerMinute: Option[Int] = None,
allowedKinds: Option[Set[String]] = None,
- storeActivations: Option[Boolean] = None)
+ storeActivations: Option[Boolean] = None,
+ warmedContainerKeepingCount: Option[Int] = None,
+ warmedContainerKeepingTimeout: Option[String] = None)
object UserLimits extends DefaultJsonProtocol {
val standardUserLimits = UserLimits()
- implicit val serdes = jsonFormat5(UserLimits.apply)
+ implicit val serdes = jsonFormat7(UserLimits.apply)
}
protected[core] case class Namespace(name: EntityName, uuid: UUID)
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index 9aeea13..2dc80d4 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -155,6 +155,7 @@ whisk {
#aka 'How long should a container sit idle until we kill it?'
idle-container = 10 minutes
pause-grace = 50 milliseconds
+ keeping-duration = 60 minutes
}
action-health-check {
enabled = false # if true, prewarm containers will be pinged
periodically and warm containers will be pinged once after resumed
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index f50f6ca..1f8476c 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -961,7 +961,9 @@ class ContainerProxy(factory: (TransactionId,
}
}
-final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
pauseGrace: FiniteDuration)
+final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
+ pauseGrace: FiniteDuration,
+ keepingDuration: FiniteDuration)
final case class ContainerProxyHealthCheckConfig(enabled: Boolean,
checkPeriod: FiniteDuration, maxFails: Int)
final case class ContainerProxyActivationErrorLogConfig(applicationErrors:
Boolean,
developerErrors:
Boolean,
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
new file mode 100644
index 0000000..8a0fdc4
--- /dev/null
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.invoker
+
+import akka.Done
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem,
CoordinatedShutdown, Props}
+import akka.grpc.GrpcClientSettings
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import com.ibm.etcd.api.Event.EventType
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.WatchUpdate
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ack.{ActiveAck, HealthActionAck,
MessagingActiveAck, UserEventSender}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool._
+import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.database.{UserContext, _}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
+import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulerStates}
+import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker,
LeaseKeepAliveService, WatcherService}
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest}
+import org.apache.openwhisk.spi.SpiLoader
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success, Try}
+
+case class GrpcServiceConfig(tls: Boolean)
+
+object FPCInvokerReactive extends InvokerProvider {
+
+ override def instance(
+ config: WhiskConfig,
+ instance: InvokerInstanceId,
+ producer: MessageProducer,
+ poolConfig: ContainerPoolConfig,
+ limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem,
logging: Logging): InvokerCore =
+ new FPCInvokerReactive(config, instance, producer, poolConfig,
limitsConfig)
+}
+
+class FPCInvokerReactive(config: WhiskConfig,
+ instance: InvokerInstanceId,
+ producer: MessageProducer,
+ poolConfig: ContainerPoolConfig =
+
loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
+ limitsConfig: ConcurrencyLimitConfig =
loadConfigOrThrow[ConcurrencyLimitConfig](
+ ConfigKeys.concurrencyLimit))(implicit actorSystem:
ActorSystem, logging: Logging)
+ extends InvokerCore {
+
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+ implicit val exe: ExecutionContextExecutor = actorSystem.dispatcher
+ implicit val cfg: WhiskConfig = config
+
+ private val logsProvider =
SpiLoader.get[LogStoreProvider].instance(actorSystem)
+ logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
+
+ private val etcdClient =
EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+
+ private val grpcConfig =
loadConfigOrThrow[GrpcServiceConfig](ConfigKeys.schedulerGrpcService)
+
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+ private val leaseService =
+ actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, instance,
watcherService))
+
+ private val etcdWorkerFactory =
+ (f: ActorRefFactory) => f.actorOf(EtcdWorker.props(etcdClient,
leaseService))
+
+ val dataManagementService: ActorRef =
+ actorSystem.actorOf(DataManagementService.props(watcherService,
etcdWorkerFactory))
+
+ private val warmedSchedulers = TrieMap[String, String]()
+ private var warmUpWatcher: Option[Watch] = None
+
+ /**
+ * Factory used by the ContainerProxy to physically create a new container.
+ *
+ * Create and initialize the container factory before kicking off any other
+ * task or actor because further operation does not make sense if something
+ * goes wrong here. Initialization will throw an exception upon failure.
+ */
+ private val containerFactory =
+ SpiLoader
+ .get[ContainerFactoryProvider]
+ .instance(
+ actorSystem,
+ logging,
+ config,
+ instance,
+ Map(
+ "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+ "--ulimit" -> Set("nofile=1024:1024"),
+ "--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters)
+ containerFactory.init()
+
+ CoordinatedShutdown(actorSystem)
+ .addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "cleanup
runtime containers") { () =>
+ containerFactory.cleanup()
+ Future.successful(Done)
+ }
+
+ /** Initialize needed databases */
+ private val entityStore = WhiskEntityStore.datastore()
+ private val activationStore =
+ SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
+
+ private val authStore = WhiskAuthStore.datastore()
+
+ private val namespaceBlacklist: NamespaceBlacklist = new
NamespaceBlacklist(authStore)
+
+
Scheduler.scheduleWaitAtMost(loadConfigOrThrow[NamespaceBlacklistConfig](ConfigKeys.blacklist).pollInterval)
{ () =>
+ logging.debug(this, "running background job to update blacklist")
+ namespaceBlacklist.refreshBlacklist()(ec, TransactionId.invoker).andThen {
+ case Success(set) => logging.info(this, s"updated blacklist to
${set.size} entries")
+ case Failure(t) => logging.error(this, s"error on updating the
blacklist: ${t.getMessage}")
+ }
+ }
+
+ val containerProxyTimeoutConfig =
loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts)
+
+ private def getWarmedContainerLimit(invocationNamespace: String):
Future[(Int, FiniteDuration)] = {
+ implicit val trasnid = TransactionId.unknown
+ Identity
+ .get(authStore, EntityName(invocationNamespace))(trasnid)
+ .map { identity =>
+ val warmedContainerKeepingCount =
identity.limits.warmedContainerKeepingCount.getOrElse(1)
+ val warmedContainerKeepingTimeout = Try {
+
identity.limits.warmedContainerKeepingTimeout.map(Duration(_).toSeconds.seconds).get
+ }.getOrElse(containerProxyTimeoutConfig.keepingDuration)
+ (warmedContainerKeepingCount, warmedContainerKeepingTimeout)
+ }
+ .andThen {
+ case Failure(_: NoDocumentException) =>
+ logging.warn(this, s"namespace does not exist:
$invocationNamespace")(trasnid)
+ case Failure(_: IllegalStateException) =>
+ logging.warn(this, s"namespace is not unique:
$invocationNamespace")(trasnid)
+ }
+ }
+
+ private val ack = {
+ val sender = if (UserEvents.enabled) Some(new UserEventSender(producer))
else None
+ new MessagingActiveAck(producer, instance, sender)
+ }
+
+ // we don't need to store health action results in normal case
+ private val healthActionAck: ActiveAck = new HealthActionAck(producer)
+
+ private val collectLogs = new LogStoreCollector(logsProvider)
+
+ /** Stores an activation in the database. */
+ private val store = (tid: TransactionId, activation: WhiskActivation,
isBlocking: Boolean, context: UserContext) => {
+ implicit val transid: TransactionId = tid
+ activationStore.storeAfterCheck(activation, isBlocking, None,
context)(tid, notifier = None, logging)
+ }
+
+ private def healthActivationClientFactory(f: ActorRefFactory,
+ invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ rev: DocRevision,
+ schedulerHost: String,
+ rpcPort: Int,
+ containerId: ContainerId):
ActorRef =
+ f.actorOf(Props(HealthActivationServiceClient()))
+
+ private def healthContainerProxyFactory(f: ActorRefFactory, healthManger:
ActorRef): ActorRef = {
+ implicit val transId = TransactionId.invokerNanny
+ f.actorOf(
+ FunctionPullingContainerProxy
+ .props(
+ containerFactory.createContainer,
+ entityStore,
+ namespaceBlacklist,
+ WhiskAction.get,
+ dataManagementService,
+ healthActivationClientFactory,
+ healthActionAck,
+ store,
+ collectLogs,
+ getLiveContainerCount,
+ getWarmedContainerLimit,
+ instance,
+ healthManger,
+ poolConfig,
+ containerProxyTimeoutConfig))
+ }
+
+ private val invokerHealthManager =
+ actorSystem.actorOf(
+ InvokerHealthManager.props(instance, healthContainerProxyFactory,
dataManagementService, entityStore))
+
+ invokerHealthManager ! Enable
+
+ private def activationClientFactory(etcd: EtcdClient)(
+ invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ schedulerHost: String,
+ rpcPort: Int,
+ tryOtherScheduler: Boolean = false): Future[ActivationServiceClient] = {
+
+ if (!tryOtherScheduler) {
+ val setting =
+ GrpcClientSettings
+ .connectToServiceAt(schedulerHost, rpcPort)
+ .withTls(grpcConfig.tls)
+ Future {
+ ActivationServiceClient(setting)
+ }.andThen {
+ case Failure(t) =>
+ logging.error(
+ this,
+ s"unable to create activation client for action ${fqn}: ${t} on
original scheduler: ${schedulerHost}:${rpcPort}")
+ }
+ } else {
+ val leaderKey = queue(invocationNamespace, fqn, leader = true)
+ etcd
+ .get(leaderKey)
+ .flatMap { res =>
+ require(!res.getKvsList.isEmpty)
+
+ val endpoint: String = res.getKvsList.get(0).getValue
+ Future(SchedulerEndpoints.parse(endpoint))
+ .flatMap(Future.fromTry)
+ .map { schedulerEndpoint =>
+ val setting =
+ GrpcClientSettings
+ .connectToServiceAt(schedulerEndpoint.host,
schedulerEndpoint.rpcPort)
+ .withTls(grpcConfig.tls)
+
+ ActivationServiceClient(setting)
+ }
+ .andThen {
+ case Failure(t) =>
+ logging.error(this, s"unable to create activation client for
action ${fqn}: ${t}")
+ }
+ }
+ }
+
+ }
+
+ private def sendAckToScheduler(schedulerInstanceId: SchedulerInstanceId,
+ creationAckMessage:
ContainerCreationAckMessage): Future[RecordMetadata] = {
+ val topic =
s"${Invoker.topicPrefix}creationAck${schedulerInstanceId.asString}"
+ val reschedulable =
+
creationAckMessage.error.map(ContainerCreationError.whiskErrors.contains(_)).getOrElse(false)
+ if (reschedulable) {
+ MetricEmitter.emitCounterMetric(
+
LoggingMarkers.INVOKER_CONTAINER_CREATE(creationAckMessage.action.toString,
"reschedule"))
+ } else if (creationAckMessage.error.nonEmpty) {
+ MetricEmitter.emitCounterMetric(
+
LoggingMarkers.INVOKER_CONTAINER_CREATE(creationAckMessage.action.toString,
"failed"))
+ }
+
+ producer.send(topic, creationAckMessage).andThen {
+ case Success(_) =>
+ logging.info(
+ this,
+ s"Posted ${if (reschedulable) "rescheduling"
+ else if (creationAckMessage.error.nonEmpty) "failed"
+ else "success"} ack of container creation
${creationAckMessage.creationId} for
${creationAckMessage.invocationNamespace}/${creationAckMessage.action}")
+ case Failure(t) =>
+ logging.error(
+ this,
+ s"failed to send container creation ack
message(${creationAckMessage.creationId}) for
${creationAckMessage.invocationNamespace}/${creationAckMessage.action} to
scheduler: ${t.getMessage}")
+ }
+ }
+
+ /** Creates a ContainerProxy Actor when being called. */
+ private val childFactory = (f: ActorRefFactory) => {
+ implicit val transId = TransactionId.invokerNanny
+ f.actorOf(
+ FunctionPullingContainerProxy
+ .props(
+ containerFactory.createContainer,
+ entityStore,
+ namespaceBlacklist,
+ WhiskAction.get,
+ dataManagementService,
+ clientProxyFactory,
+ ack,
+ store,
+ collectLogs,
+ getLiveContainerCount,
+ getWarmedContainerLimit,
+ instance,
+ invokerHealthManager,
+ poolConfig,
+ containerProxyTimeoutConfig))
+ }
+
+ /** Creates a ActivationClientProxy Actor when being called. */
+ private def clientProxyFactory(f: ActorRefFactory,
+ invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ rev: DocRevision,
+ schedulerHost: String,
+ rpcPort: Int,
+ containerId: ContainerId): ActorRef = {
+ implicit val transId = TransactionId.invokerNanny
+ f.actorOf(
+ ActivationClientProxy
+ .props(invocationNamespace, fqn, rev, schedulerHost, rpcPort,
containerId, activationClientFactory(etcdClient)))
+ }
+
+ val prewarmingConfigs: List[PrewarmingConfig] = {
+ ExecManifest.runtimesManifest.stemcells.flatMap {
+ case (mf, cells) =>
+ cells.map { cell =>
+ PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "",
None), cell.memory)
+ }
+ }.toList
+ }
+
+ private val pool =
+ actorSystem.actorOf(
+ ContainerPoolV2
+ .props(childFactory, invokerHealthManager, poolConfig, instance,
prewarmingConfigs, sendAckToScheduler))
+
+ private def getLiveContainerCount(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ revision: DocRevision): Future[Long] = {
+ val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix,
invocationNamespace, fqn, Some(revision))
+ val inProgressPrefix = containerPrefix(ContainerKeys.inProgressPrefix,
invocationNamespace, fqn, Some(revision))
+ val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix,
invocationNamespace, fqn, Some(revision))
+ for {
+ namespaceCount <- etcdClient.getCount(namespacePrefix)
+ inProgressCount <- etcdClient.getCount(inProgressPrefix)
+ warmedCount <- etcdClient.getCount(warmedPrefix)
+ } yield {
+ namespaceCount + inProgressCount + warmedCount
+ }
+ }
+
+ /** Initialize message consumers */
+ private val msgProvider = SpiLoader.get[MessagingProvider]
+ //number of peeked messages - increasing the concurrentPeekFactor improves
concurrent usage, but adds risk for message loss in case of crash
+ private val maxPeek =
loadConfigOrThrow[Int](ConfigKeys.containerCreationMaxPeek)
+ private var consumer: Option[ContainerMessageConsumer] = Some(
+ new ContainerMessageConsumer(
+ instance,
+ pool,
+ entityStore,
+ cfg,
+ msgProvider,
+ longPollDuration = 1.second,
+ maxPeek,
+ sendAckToScheduler))
+
+ override def enable(): Route = {
+ invokerHealthManager ! Enable
+ pool ! Enable
+ // re-enable consumer
+ if (consumer.isEmpty)
+ consumer = Some(
+ new ContainerMessageConsumer(
+ instance,
+ pool,
+ entityStore,
+ cfg,
+ msgProvider,
+ longPollDuration = 1.second,
+ maxPeek,
+ sendAckToScheduler))
+ warmUp()
+ complete("Success enable invoker")
+ }
+
+ override def disable(): Route = {
+ invokerHealthManager ! GracefulShutdown
+ pool ! GracefulShutdown
+ consumer.foreach(_.close())
+ consumer = None
+ warmUpWatcher.foreach(_.close())
+ warmUpWatcher = None
+ complete("Successfully disabled invoker")
+ }
+
+ override def backfillPrewarm(): Route = {
+ pool ! AdjustPrewarmedContainer
+ complete("backfilling prewarm container")
+ }
+
+ private val warmUpFetchRequest = FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
+ InvokerHealthManager.healthActionIdentity.namespace.name.asString,
+ WarmUp.warmUpAction.serialize,
+ DocRevision.empty.serialize) // a warm up fetch request which contains
nothing
+
+ // warm up grpc connection with scheduler
+ private def warmUpScheduler(scheduler: SchedulerEndpoints) = {
+ val setting =
+ GrpcClientSettings
+ .connectToServiceAt(scheduler.host, scheduler.rpcPort)
+ .withTls(grpcConfig.tls)
+ val client = ActivationServiceClient(setting)
+ client.fetchActivation(warmUpFetchRequest).andThen {
+ case _ =>
+ logging.info(this, s"Warmed up scheduler $scheduler")
+ client.close()
+ }
+ }
+
+ private def warmUp(): Unit = {
+ implicit val transId = TransactionId.warmUp
+ if (warmUpWatcher.isEmpty)
+ warmUpWatcher = Some(etcdClient.watch(SchedulerKeys.prefix, true) { res:
WatchUpdate =>
+ res.getEvents.asScala.foreach {
+ event =>
+ event.getType match {
+ case EventType.DELETE =>
+ val key = event.getPrevKv.getKey
+ warmedSchedulers.remove(key)
+ case EventType.PUT =>
+ val key = event.getKv.getKey
+ val value = event.getKv.getValue
+ SchedulerStates
+ .parse(value)
+ .map { state =>
+ // warm up new scheduler
+ warmedSchedulers.getOrElseUpdate(key, {
+ logging.info(this, s"Warm up scheduler ${state.sid}")
+ warmUpScheduler(state.endpoints)
+ value
+ })
+ }
+ case _ =>
+ }
+ }
+ })
+
+ etcdClient.getPrefix(SchedulerKeys.prefix).map { res =>
+ res.getKvsList.asScala.map { kv =>
+ val scheduler = kv.getKey
+ warmedSchedulers.getOrElseUpdate(
+ scheduler, {
+ logging.info(this, s"Warm up scheduler $scheduler")
+ SchedulerStates
+ .parse(kv.getValue)
+ .map { state =>
+ warmUpScheduler(state.endpoints)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error $t")
+ }
+
+ kv.getValue
+ })
+
+ }
+ }
+ }
+ warmUp()
+}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 437beb3..6de751f 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -220,6 +220,7 @@ trait InvokerProvider extends Spi {
trait InvokerCore {
def enable(): Route
def disable(): Route
+ def backfillPrewarm(): Route
}
/**
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 3177cb7..f2d36b1 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -307,4 +307,8 @@ class InvokerReactive(
complete("not supported")
}
+ override def backfillPrewarm(): Route = {
+ complete("not supported")
+ }
+
}
diff --git a/tests/src/test/resources/application.conf.j2
b/tests/src/test/resources/application.conf.j2
index 6b69172..352fa16 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -140,13 +140,15 @@ whisk {
scheduler {
protocol = "{{ scheduler.protocol }}"
+ grpc {
+ tls = "{{ scheduler.grpc.tls | default('false') | lower }}"
+ }
queue-manager {
max-scheduling-time = "{{
scheduler.queueManager.maxSchedulingTime }}"
max-retries-to-get-queue = "{{
scheduler.queueManager.maxRetriesToGetQueue }}"
}
max-peek = "{{ scheduler.maxPeek }}"
}
-
}
#test-only overrides so that tests can override defaults in application.conf
(todo: move all defaults to reference.conf)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 9b91d55..c779320 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -117,7 +117,7 @@ class FunctionPullingContainerProxyTests
false,
1.second)
- val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds)
+ val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds,
5.seconds)
val messageTransId = TransactionId(TransactionId.testing.meta.id)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
index 153e228..1095128 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala
@@ -130,6 +130,10 @@ class TestInvokerReactive extends InvokerCore with
BasicHttpService {
complete("")
}
+ override def backfillPrewarm(): Route = {
+ complete("")
+ }
+
def reset(): Unit = {
enableCount = 0
disableCount = 0
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
index 8cf8d91..e387cd6 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala
@@ -129,6 +129,10 @@ class TestFPCInvokerReactive extends InvokerCore with
BasicHttpService {
complete("")
}
+ override def backfillPrewarm(): Route = {
+ complete("")
+ }
+
def reset(): Unit = {
enableCount = 0
disableCount = 0