This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6e883f9 Adding YARNContainerFactory. This allows OpenWhisk to run
actions on Apache Hadoop clusters. (#4129)
6e883f9 is described below
commit 6e883f9392aeb8e1ceced92bf08eab517109281f
Author: Sam Hjelmfelt <[email protected]>
AuthorDate: Sat Feb 23 06:04:56 2019 -0800
Adding YARNContainerFactory. This allows OpenWhisk to run actions on Apache
Hadoop clusters. (#4129)
---
common/scala/src/main/resources/application.conf | 12 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 1 +
.../openwhisk/core/yarn/YARNComponentActor.scala | 107 +++++
.../openwhisk/core/yarn/YARNContainerFactory.scala | 241 ++++++++++
.../core/yarn/YARNContainerInfoActor.scala | 122 +++++
.../apache/openwhisk/core/yarn/YARNRESTUtil.scala | 200 +++++++++
.../org/apache/openwhisk/core/yarn/YARNTask.scala | 91 ++++
docs/yarn.md | 84 ++++
.../core/containerpool/yarn/test/MockYARNRM.scala | 247 +++++++++++
.../yarn/test/YARNContainerFactoryTests.scala | 493 +++++++++++++++++++++
10 files changed, 1598 insertions(+)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index cf8a3ee..747b27f 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -341,6 +341,18 @@ whisk {
}
}
+ yarn {
+ master-url="http://localhost:8088" //YARN Resource Manager endpoint to
be accessed from the invoker
+ yarn-link-log-message=true //If true, display a link to YARN in the
static log message, otherwise do not include a link to YARN.
+ service-name="openwhisk-action-service" //Name of the YARN Service
created by the invoker. The invoker number will be appended.
+ auth-type="simple" //Authentication type for YARN (simple or kerberos)
+ kerberos-principal="" //Kerberos principal to use for the YARN
service. Note: must include a hostname
+ kerberos-keytab="" //Location of keytab accessible by all node managers
+ queue="default" //Name of the YARN queue where the service will be
created
+ memory=256 //Memory used by each YARN container
+ cpus=1 //CPUs used by each YARN container
+ }
+
logstore {
#SplunkLogStore configuration
#splunk {
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index dcc9962..7fd0c98 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -233,6 +233,7 @@ object ConfigKeys {
val logStoreElasticSearch = s"$logStore.elasticsearch"
val mesos = "whisk.mesos"
+ val yarn = "whisk.yarn"
val containerProxy = "whisk.container-proxy"
val containerProxyTimeouts = s"$containerProxy.timeouts"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala
new file mode 100644
index 0000000..78aeeb9
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNComponentActor.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.yarn
+
+import akka.actor.{Actor, ActorSystem}
+import akka.http.scaladsl.model.{HttpMethods, StatusCodes}
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+import
org.apache.openwhisk.core.yarn.YARNComponentActor.{CreateContainerAsync,
RemoveContainer}
+import spray.json.{JsArray, JsNumber, JsObject, JsString}
+
+import scala.concurrent.ExecutionContext
+
+/** Submits create and decommission commands to YARN */
+object YARNComponentActor {
+ case object CreateContainerAsync
+ case class RemoveContainer(component_instance_name: String)
+}
+
+class YARNComponentActor(actorSystem: ActorSystem,
+ logging: Logging,
+ yarnConfig: YARNConfig,
+ serviceName: String,
+ imageName: ImageName)
+ extends Actor {
+
+ implicit val as: ActorSystem = actorSystem
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+
+ //Adding a container via the YARN REST API is actually done by flexing the
component's container pool to a certain size.
+ // This actor must track the current containerCount in order to make the
correct scale-up request.
+ var containerCount: Int = 0
+
+ def receive: PartialFunction[Any, Unit] = {
+ case CreateContainerAsync =>
+ sender ! createContainerAsync
+
+ case RemoveContainer(component_instance_name) =>
+ sender ! removeContainer(component_instance_name)
+
+ case input =>
+ throw new IllegalArgumentException("Unknown input: " + input)
+ sender ! false
+ }
+
+ def createContainerAsync(): Unit = {
+ logging.info(this, s"Using YARN to create a container with image
${imageName.name}...")
+
+ val body = JsObject("number_of_containers" -> JsNumber(containerCount +
1)).compactPrint
+ val response = YARNRESTUtil.submitRequestWithAuth(
+ yarnConfig.authType,
+ HttpMethods.PUT,
+
s"${yarnConfig.masterUrl}/app/v1/services/$serviceName/components/${imageName.name}",
+ body)
+ response match {
+ case httpresponse(StatusCodes.OK, content) =>
+ logging.info(this, s"Added container: ${imageName.name}. Response:
$content")
+ containerCount += 1
+
+ case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
+ }
+ }
+
+ def removeContainer(component_instance_name: String): Unit = {
+ logging.info(this, s"Removing ${imageName.name} container:
$component_instance_name ")
+ if (containerCount <= 0) {
+ logging.warn(this, "Already at 0 containers")
+ } else {
+ val body = JsObject(
+ "components" -> JsArray(
+ JsObject(
+ "name" -> JsString(imageName.name),
+ "decommissioned_instances" ->
JsArray(JsString(component_instance_name))))).compactPrint
+ val response = YARNRESTUtil.submitRequestWithAuth(
+ yarnConfig.authType,
+ HttpMethods.PUT,
+ s"${yarnConfig.masterUrl}/app/v1/services/$serviceName",
+ body)
+ response match {
+ case httpresponse(StatusCodes.OK, content) =>
+ logging.info(
+ this,
+ s"Successfully removed ${imageName.name} container:
$component_instance_name. Response: $content")
+ containerCount -= 1
+
+ case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
+ }
+ }
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
new file mode 100644
index 0000000..35d6c03
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.yarn
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.http.scaladsl.model.{HttpMethods, StatusCodes}
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.containerpool._
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest,
InvokerInstanceId}
+import org.apache.openwhisk.core.yarn.YARNComponentActor.CreateContainerAsync
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import pureconfig.loadConfigOrThrow
+import spray.json._
+
+import scala.collection.immutable.HashMap
+import scala.concurrent.{blocking, ExecutionContext, Future}
+import scala.concurrent.duration._
+import YARNJsonProtocol._
+import akka.stream.ActorMaterializer
+
+case class YARNConfig(masterUrl: String,
+ yarnLinkLogMessage: Boolean,
+ serviceName: String,
+ authType: String,
+ kerberosPrincipal: String,
+ kerberosKeytab: String,
+ queue: String,
+ memory: String,
+ cpus: Int)
+
+object YARNContainerFactoryProvider extends ContainerFactoryProvider {
+ override def instance(actorSystem: ActorSystem,
+ logging: Logging,
+ config: WhiskConfig,
+ instance: InvokerInstanceId,
+ parameters: Map[String, Set[String]]):
ContainerFactory =
+ new YARNContainerFactory(actorSystem, logging, config, instance,
parameters)
+}
+
+class YARNContainerFactory(actorSystem: ActorSystem,
+ logging: Logging,
+ config: WhiskConfig,
+ instance: InvokerInstanceId,
+ parameters: Map[String, Set[String]],
+ containerArgs: ContainerArgsConfig =
+
loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
+ yarnConfig: YARNConfig =
loadConfigOrThrow[YARNConfig](ConfigKeys.yarn))
+ extends ContainerFactory {
+
+ val images: Set[ImageName] =
ExecManifest.runtimesManifest.runtimes.flatMap(a => a.versions.map(b =>
b.image))
+
+ //One actor of each type per image for parallelism
+ private var yarnComponentActors: Map[ImageName, ActorRef] =
HashMap[ImageName, ActorRef]()
+ private var YARNContainerInfoActors: Map[ImageName, ActorRef] =
HashMap[ImageName, ActorRef]()
+
+ val serviceStartTimeoutMS = 60000
+ val retryWaitMS = 1000
+ val runCommand = ""
+ val version = "1.0.0"
+ val description = "OpenWhisk Action Service"
+
+ //Allows for invoker HA
+ val serviceName: String = yarnConfig.serviceName + "-" + instance.toInt
+
+ val containerStartTimeoutMS = 60000
+
+ implicit val as: ActorSystem = actorSystem
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+
+ override def init(): Unit = {
+ yarnComponentActors = images
+ .map(
+ i =>
+ (
+ i,
+ actorSystem.actorOf(
+ Props(new YARNComponentActor(actorSystem, logging, yarnConfig,
serviceName, i)),
+ name = s"YARNComponentActor-${i.name}")))
+ .toMap
+ YARNContainerInfoActors = images
+ .map(
+ i =>
+ (
+ i,
+ actorSystem.actorOf(
+ Props(new YARNContainerInfoActor(actorSystem, logging,
yarnConfig, serviceName, i)),
+ name = s"YARNComponentInfoActor-${i.name}")))
+ .toMap
+ blocking {
+ implicit val timeout: Timeout =
Timeout(serviceStartTimeoutMS.milliseconds)
+
+ //Remove service if it already exists
+ val serviceDef =
+ YARNRESTUtil.downloadServiceDefinition(yarnConfig.authType,
serviceName, yarnConfig.masterUrl)(logging)
+
+ if (serviceDef != null)
+ removeService()
+
+ createService()
+ }
+ }
+ override def createContainer(
+ unusedtid: TransactionId,
+ unusedname: String,
+ actionImage: ExecManifest.ImageName,
+ unuseduserProvidedImage: Boolean,
+ unusedmemory: ByteSize,
+ unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging):
Future[Container] = {
+ implicit val timeout: Timeout =
Timeout(containerStartTimeoutMS.milliseconds)
+
+ //First send the create command to YARN, then with a different actor, wait
for the container to be ready
+ ask(yarnComponentActors(actionImage), CreateContainerAsync).flatMap(_ =>
+ ask(YARNContainerInfoActors(actionImage),
GetContainerInfo(yarnComponentActors(actionImage))).mapTo[Container])
+ }
+ override def cleanup(): Unit = {
+ removeService()
+ yarnComponentActors foreach { case (k, v) => actorSystem.stop(v) }
+ YARNContainerInfoActors foreach { case (k, v) => actorSystem.stop(v) }
+ }
+ def createService(): Unit = {
+ logging.info(this, "Creating Service with images: " + images.map(i =>
i.publicImageName).mkString(", "))
+
+ val componentList = images
+ .map(
+ i =>
+ ComponentDefinition(
+ i.name.replace('.', '-'), //name must be [a-z][a-z0-9-]*
+ Some(0), //start with zero containers
+ Some(runCommand),
+ Option.empty,
+ Some(ArtifactDefinition(i.publicImageName, "DOCKER")),
+ Some(ResourceDefinition(yarnConfig.cpus, yarnConfig.memory)),
+
Some(ConfigurationDefinition(Map(("YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE",
"true")))),
+ List[String]()))
+ .toList
+
+ //Add kerberos def if necessary
+ var kerberosDef: Option[KerberosPrincipalDefinition] = None
+ if (yarnConfig.authType.equals(YARNRESTUtil.KERBEROSAUTH))
+ kerberosDef = Some(
+ KerberosPrincipalDefinition(Some(yarnConfig.kerberosPrincipal),
Some(yarnConfig.kerberosKeytab)))
+
+ val service = ServiceDefinition(
+ Some(serviceName),
+ Some(version),
+ Some(description),
+ Some("STABLE"),
+ Some(yarnConfig.queue),
+ componentList,
+ kerberosDef)
+
+ //Submit service
+ val response =
+ YARNRESTUtil.submitRequestWithAuth(
+ yarnConfig.authType,
+ HttpMethods.POST,
+ s"${yarnConfig.masterUrl}/app/v1/services",
+ service.toJson.compactPrint)
+
+ //Handle response
+ response match {
+ case httpresponse(StatusCodes.OK, content) =>
+ logging.info(this, s"Service submitted. Response: $content")
+
+ case httpresponse(StatusCodes.Accepted, content) =>
+ logging.info(this, s"Service submitted. Response: $content")
+
+ case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
+ }
+
+ //Wait for service start (up to serviceStartTimeoutMS milliseconds)
+ var started = false
+ var retryCount = 0
+ val maxRetryCount = serviceStartTimeoutMS / retryWaitMS
+ while (!started && retryCount < maxRetryCount) {
+ val serviceDef =
+ YARNRESTUtil.downloadServiceDefinition(yarnConfig.authType,
serviceName, yarnConfig.masterUrl)(logging)
+
+ if (serviceDef == null) {
+ logging.info(this, "Service not found yet")
+ Thread.sleep(retryWaitMS)
+ } else {
+ serviceDef.state.getOrElse(None) match {
+ case "STABLE" | "STARTED" =>
+ logging.info(this, "YARN service achieved stable state")
+ started = true
+
+ case state =>
+ logging.info(
+ this,
+ s"YARN service is not in stable state yet
($retryCount/$maxRetryCount). Current state: $state")
+ Thread.sleep(retryWaitMS)
+ }
+ }
+ retryCount += 1
+ }
+ if (!started)
+ throw new Exception(s"After ${serviceStartTimeoutMS}ms YARN service did
not achieve stable state")
+ }
+ def removeService(): Unit = {
+ val response: httpresponse =
+ YARNRESTUtil.submitRequestWithAuth(
+ yarnConfig.authType,
+ HttpMethods.DELETE,
+ s"${yarnConfig.masterUrl}/app/v1/services/$serviceName",
+ "")
+
+ response match {
+ case httpresponse(StatusCodes.OK, _) =>
+ logging.info(this, "YARN service Removed")
+
+ case httpresponse(StatusCodes.NotFound, _) =>
+ logging.warn(this, "YARN service did not exist")
+
+ case httpresponse(StatusCodes.BadRequest, _) =>
+ logging.warn(this, "YARN service did not exist")
+
+ case httpresponse(_, _) =>
+ YARNRESTUtil.handleYARNRESTError(logging)
+ }
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala
new file mode 100644
index 0000000..4837940
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerInfoActor.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.yarn
+
+import akka.actor.{Actor, ActorRef, ActorSystem}
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+
+import scala.collection.immutable.HashMap
+import scala.concurrent.ExecutionContext
+
+case class GetContainerInfo(yarnComponentActorRef: ActorRef)
+
+//This actor is separate from the YARNComponentActor so that container create
commands can be issued in parallel
+class YARNContainerInfoActor(actorSystem: ActorSystem,
+ logging: Logging,
+ yarnConfig: YARNConfig,
+ serviceName: String,
+ imageName: ImageName)
+ extends Actor {
+
+ implicit val as: ActorSystem = actorSystem
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+
+ val containerStartTimeoutMS = 60000
+ val retryWaitMS = 1000
+
+ //Map with the definition of all active containers
+ var containerDefMap: Map[String, ContainerDefinition] = new HashMap[String,
ContainerDefinition]
+
+ //Map that keeps track of which containers have been returned to the main
invoker for use
+ val containersAllocated = new scala.collection.mutable.HashMap[String,
Boolean]
+
+ def receive: PartialFunction[Any, Unit] = {
+
+ case GetContainerInfo(yarnComponentActorRef) =>
+ //Check if there are any left over containers from the last check
+ var firstNewContainerName = containersAllocated.find { case (k, v) => !v
}
+
+ //If no containers are ready, wait for one to come up (up to
containerStartTimeoutMS milliseconds)
+ var retryCount = 0
+ val maxRetryCount = containerStartTimeoutMS / retryWaitMS
+ while (firstNewContainerName.isEmpty && retryCount < maxRetryCount) {
+ //Get updated service def
+ val serviceDef =
+ YARNRESTUtil.downloadServiceDefinition(yarnConfig.authType,
serviceName, yarnConfig.masterUrl)(logging)
+
+ //Update container list with new container details
+ if (serviceDef == null) {
+ retryCount += 1
+ Thread.sleep(retryWaitMS)
+ logging.info(this, s"Waiting for ${imageName.name} YARN container
($retryCount/$maxRetryCount)")
+ } else {
+ containerDefMap = serviceDef.components
+ .filter(c => c.name.equals(imageName.name))
+ .flatMap(c => c.containers.getOrElse(List[ContainerDefinition]()))
+ .filter(containerDef => containerDef.state.equals("READY"))
+ .map(containerDef => (containerDef.component_instance_name,
containerDef))
+ .toMap
+
+ //Filter map to only contain active containers
+ containersAllocated.retain((k, v) => containerDefMap.contains(k))
+ for (containerDef <- containerDefMap) {
+ if (!containersAllocated.contains(containerDef._1))
+ containersAllocated.put(containerDef._1, false)
+ }
+
+ firstNewContainerName = containersAllocated.find { case (k, v) => !v
}
+
+ //keep waiting
+ if (firstNewContainerName.isEmpty) {
+ retryCount += 1
+ Thread.sleep(retryWaitMS)
+ logging.info(this, s"Waiting for ${imageName.name} YARN container
($retryCount/$maxRetryCount)")
+ }
+ }
+ }
+ if (firstNewContainerName.isEmpty) {
+ throw new Exception(s"After ${containerStartTimeoutMS}ms
${imageName.name} YARN container was not available")
+ }
+
+ //Return container
+ val newContainerDef = containerDefMap(firstNewContainerName.get._1)
+ containersAllocated(firstNewContainerName.get._1) = true
+
+ val containerAddress =
ContainerAddress(newContainerDef.ip.getOrElse("127.0.0.1")) //default port is
8080
+ val containerId = ContainerId(newContainerDef.id)
+
+ logging.info(this, s"New ${imageName.name} YARN Container:
${newContainerDef.id}, $containerAddress")
+ sender ! new YARNTask(
+ containerId,
+ containerAddress,
+ ec,
+ logging,
+ as,
+ newContainerDef.component_instance_name,
+ imageName,
+ yarnConfig,
+ yarnComponentActorRef)
+ case input =>
+ throw new IllegalArgumentException("Unknown input: " + input)
+ sender ! None
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNRESTUtil.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNRESTUtil.scala
new file mode 100644
index 0000000..9279c1b
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNRESTUtil.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.yarn
+
+import java.nio.charset.StandardCharsets
+import java.security.Principal
+
+import akka.http.scaladsl.model.{HttpMethod, HttpMethods, StatusCode,
StatusCodes}
+import javax.security.sasl.AuthenticationException
+import org.apache.commons.io.IOUtils
+import org.apache.http.auth.{AuthSchemeProvider, AuthScope, Credentials}
+import org.apache.http.client.CredentialsProvider
+import org.apache.http.client.config.AuthSchemes
+import org.apache.http.client.methods._
+import org.apache.http.config.RegistryBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.auth.SPNegoSchemeFactory
+import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder,
HttpClients}
+import org.apache.openwhisk.common.Logging
+import spray.json._
+
+case class KerberosPrincipalDefinition(principal_name: Option[String], keytab:
Option[String])
+case class YARNResponseDefinition(diagnostics: String)
+case class ConfigurationDefinition(env: Map[String, String])
+case class ArtifactDefinition(id: String, `type`: String)
+case class ResourceDefinition(cpus: Int, memory: String)
+case class ContainerDefinition(ip: Option[String],
+ bare_host: Option[String],
+ component_instance_name: String,
+ hostname: Option[String],
+ id: String,
+ launch_time: Long,
+ state: String)
+case class ComponentDefinition(name: String,
+ number_of_containers: Option[Int],
+ launch_command: Option[String],
+ containers: Option[List[ContainerDefinition]],
+ artifact: Option[ArtifactDefinition],
+ resource: Option[ResourceDefinition],
+ configuration: Option[ConfigurationDefinition],
+ decommissioned_instances: List[String])
+
+case class ServiceDefinition(name: Option[String],
+ version: Option[String],
+ description: Option[String],
+ state: Option[String],
+ queue: Option[String],
+ components: List[ComponentDefinition],
+ kerberos_principal:
Option[KerberosPrincipalDefinition])
+
+object YARNJsonProtocol extends DefaultJsonProtocol {
+ implicit val KerberosPrincipalDefinitionFormat:
RootJsonFormat[KerberosPrincipalDefinition] = jsonFormat2(
+ KerberosPrincipalDefinition)
+ implicit val YARNResponseDefinitionFormat:
RootJsonFormat[YARNResponseDefinition] = jsonFormat1(
+ YARNResponseDefinition)
+ implicit val configurationDefinitionFormat:
RootJsonFormat[ConfigurationDefinition] = jsonFormat1(
+ ConfigurationDefinition)
+ implicit val artifactDefinitionFormat: RootJsonFormat[ArtifactDefinition] =
jsonFormat2(ArtifactDefinition)
+ implicit val resourceDefinitionFormat: RootJsonFormat[ResourceDefinition] =
jsonFormat2(ResourceDefinition)
+ implicit val containerDefinitionFormat: RootJsonFormat[ContainerDefinition]
= jsonFormat7(ContainerDefinition)
+ implicit val componentDefinitionFormat: RootJsonFormat[ComponentDefinition]
= jsonFormat8(ComponentDefinition)
+ implicit val serviceDefinitionFormat: RootJsonFormat[ServiceDefinition] =
jsonFormat7(ServiceDefinition)
+}
+import YARNJsonProtocol._
+
+case class httpresponse(statusCode: StatusCode, content: String)
+
+object YARNRESTUtil {
+ val SIMPLEAUTH = "simple"
+ val KERBEROSAUTH = "kerberos"
+ def downloadServiceDefinition(authType: String, serviceName: String,
masterUrl: String)(
+ implicit logging: Logging): ServiceDefinition = {
+ val response: httpresponse =
+ YARNRESTUtil.submitRequestWithAuth(authType, HttpMethods.GET,
s"$masterUrl/app/v1/services/$serviceName", "")
+
+ response match {
+ case httpresponse(StatusCodes.OK, content) =>
+ content.parseJson.convertTo[ServiceDefinition]
+
+ case httpresponse(StatusCodes.NotFound, content) =>
+ logging.info(this, s"Service not found. Response: $content")
+ null
+
+ case httpresponse(_, _) =>
+ handleYARNRESTError(logging)
+ null
+ }
+ }
+
+ def submitRequestWithAuth(authType: String, httpMethod: HttpMethod, URL:
String, body: String): httpresponse = {
+
+ var client: CloseableHttpClient = null
+ var updatedURL = URL
+
+ authType match {
+ case SIMPLEAUTH =>
+ if (URL.contains("?"))
+ updatedURL = URL + "&user.name=" + System.getProperty("user.name")
+ else
+ updatedURL = URL + "?user.name=" + System.getProperty("user.name")
+
+ client = HttpClientBuilder.create.build
+
+ case KERBEROSAUTH =>
+ //System.setProperty("sun.security.krb5.debug", "true")
+ //System.setProperty("sun.security.spnego.debug", "true")
+ System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
+
+ //Null credentials result in jaas login
+ val useJaaS = new CredentialsProvider {
+ override def setCredentials(authscope: AuthScope, credentials:
Credentials): Unit = {}
+
+ override def getCredentials(authscope: AuthScope): Credentials = new
Credentials() {
+ def getPassword: String = null
+ def getUserPrincipal: Principal = null
+ }
+ override def clear(): Unit = {}
+ }
+
+ val authSchemeRegistry = RegistryBuilder
+ .create[AuthSchemeProvider]()
+ .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
+ .build()
+
+ client = HttpClients
+ .custom()
+ .setDefaultAuthSchemeRegistry(authSchemeRegistry)
+ .setDefaultCredentialsProvider(useJaaS)
+ .build()
+ }
+
+ var request: HttpRequestBase = null
+ httpMethod match {
+ case HttpMethods.GET =>
+ request = new HttpGet(updatedURL)
+
+ case HttpMethods.POST =>
+ request = new HttpPost(updatedURL)
+ request.asInstanceOf[HttpPost].setEntity(new
StringEntity(body.toString, StandardCharsets.UTF_8.toString))
+
+ case HttpMethods.PUT =>
+ request = new HttpPut(updatedURL)
+ request.asInstanceOf[HttpPut].setEntity(new
StringEntity(body.toString, StandardCharsets.UTF_8.toString))
+
+ case HttpMethods.DELETE =>
+ request = new HttpDelete(updatedURL)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported HTTP method:
$httpMethod")
+ }
+ request.addHeader("content-type", "application/json")
+
+ try {
+ val response = client.execute(request)
+ val responseBody = IOUtils.toString(response.getEntity.getContent,
StandardCharsets.UTF_8)
+ val statusCode: Int = response.getStatusLine.getStatusCode
+
+ httpresponse(statusCode, responseBody)
+
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+
+ httpresponse(500, e.getMessage)
+ }
+ }
+ def handleYARNRESTError(implicit logging: Logging):
PartialFunction[httpresponse, Unit] = {
+
+ case httpresponse(StatusCodes.Unauthorized, content) =>
+ logging.error(
+ this,
+ s"Received 401 (Authentication Required) response code from YARN.
Check authentication. Response: $content")
+ throw new AuthenticationException(
+ s"Received 401 (Authentication Required) response code from YARN.
Check authentication. Response: $content")
+
+ case httpresponse(StatusCodes.Forbidden, content) =>
+ logging.error(this, s"Received 403 (Forbidden) response code from YARN.
Check authentication. Response: $content")
+ throw new AuthenticationException(
+ s"Received 403 (Forbidden) response code from YARN. Check
authentication. Response: $content")
+
+ case httpresponse(code, content) =>
+ logging.error(this, "Unknown response from YARN")
+ throw new Exception(s"Unknown response from YARN. Code:
${code.intValue()}, content: $content")
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
new file mode 100644
index 0000000..d7fcf1f
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNTask.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.yarn
+
+import java.time.Instant
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.stream.scaladsl.Source
+import akka.util.{ByteString, Timeout}
+import spray.json._
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.containerpool.{Container, ContainerAddress,
ContainerId}
+import org.apache.openwhisk.core.containerpool.logging.LogLine
+import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+import org.apache.openwhisk.core.yarn.YARNComponentActor.RemoveContainer
+import akka.pattern.ask
+import scala.concurrent.duration._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+ * YARNTask implementation of Container.
+ * Differences from DockerContainer include:
+ * - does not launch container using docker cli, but rather the YARN framework
+ * - does not support pause/resume
+ * - does not support log collection (currently), but does provide a message
indicating logs can be viewed via YARN UI
+ * (external log collection and retrieval must be enabled via LogStore SPI to
expose logs to wsk cli)
+ */
+class YARNTask(override protected val id: ContainerId,
+ override protected val addr: ContainerAddress,
+ override protected val ec: ExecutionContext,
+ override protected val logging: Logging,
+ override protected val as: ActorSystem,
+ val component_instance_name: String,
+ imageName: ImageName,
+ yarnConfig: YARNConfig,
+ yarnComponentActor: ActorRef)
+ extends Container {
+
+ val containerRemoveTimeoutMS = 60000
+
+ /** Stops the container from consuming CPU cycles. */
+ override def suspend()(implicit transid: TransactionId): Future[Unit] = {
+ // suspend not supported (just return result from super)
+ super.suspend()
+ }
+
+ /** Dual of halt. */
+ override def resume()(implicit transid: TransactionId): Future[Unit] = {
+ // resume not supported
+ Future.successful(Unit)
+ }
+
+ /** Completely destroys this instance of the container. */
+ override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+
+ implicit val timeout: Timeout =
Timeout(containerRemoveTimeoutMS.milliseconds)
+ ask(yarnComponentActor,
RemoveContainer(component_instance_name)).mapTo[Unit]
+ }
+
+ /**
+ * Obtains logs up to a given threshold from the container. Optionally waits
for a sentinel to appear.
+ * For YARN, this log message is static per container, just indicating that
YARN logs can be found via the YARN UI.
+ * To disable this message, and just store an static log message per
activation, set
+ * whisk.yarn.yarnLinkLogMessage=false
+ */
+ private val linkedLogMsg =
+ s"Logs are not collected from YARN containers currently. " +
+ s"You can browse the logs for YARN Service ${yarnConfig.serviceName}
using the yarn UI at ${yarnConfig.masterUrl}"
+ private val noLinkLogMsg = "Log collection is not configured correctly,
check with your service administrator."
+ private val logMsg = if (yarnConfig.yarnLinkLogMessage) linkedLogMsg else
noLinkLogMsg
+ override def logs(limit: ByteSize, waitForSentinel: Boolean)(
+ implicit transid: TransactionId): Source[ByteString, Any] =
+ Source.single(ByteString(LogLine(logMsg, "stdout",
Instant.now.toString).toJson.compactPrint))
+}
diff --git a/docs/yarn.md b/docs/yarn.md
new file mode 100644
index 0000000..389f1e5
--- /dev/null
+++ b/docs/yarn.md
@@ -0,0 +1,84 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+# YARN Support
+
+The `YARNContainerFactory` enables launching action containers within a YARN
cluster. It does not affect the deployment of OpenWhisk components (invoker,
controller).
+
+## Enable
+
+To enable YARNContainerFactory, use the following TypeSafe Config properties
+
+| property | required | details | example |
+| --- | --- | --- | --- |
+| `whisk.spi.ContainerFactoryProvider` | required | enable the
YARNContainerFactory |
org.apache.openwhisk.core.yarn.YARNContainerFactoryProvider |
+| `whisk.yarn.masterUrl` | required | YARN Resource Manager endpoint to be
accessed from the invoker | http://localhost:8088 |
+| `whisk.yarn.yarnLinkLogMessage` | optional (default true) | Display a log
message with a link to YARN when using the default LogStore (or no log message)
| true |
+| `whisk.yarn.serviceName` | optional (default openwhisk) | Name of the YARN
Service created by the invoker. The invoker number will be appended. |
openwhisk-action-service |
+| `whisk.yarn.authType` | optional (default simple) | Authentication type for
YARN | simple or kerberos |
+| `whisk.yarn.kerberosPrincipal` | optional (default "") | Kerberos principal
to use for the YARN service. Note: must include a hostname | user1/hostA@REALM
|
+| `whisk.yarn.kerberosKeytabURI` | optional (default "") | Location of keytab
accessible by all node managers | hdfs:/user/user1/user1_hostA.keytab |
+| `whisk.yarn.queue` | optional (default default) | Name of the YARN queue
where the service will be created | default |
+| `whisk.yarn.memory` | optional (default 256) | Memory used by each YARN
container | 256 |
+| `whisk.yarn.cpus` | optional (default 1) | CPUs used by each YARN container
| 1 |
+
+To set these properties for your invoker, set the corresponding environment
variables e.g.,
+```properties
+CONFIG_whisk_spi_ContainerFactoryProvider=org.apache.openwhisk.core.yarn.YARNContainerFactoryProvider
+CONFIG_whisk_yarn_masterUrl=http://localhost:8088
+CONFIG_whisk_yarn_yarnLinkLogMessage=true
+CONFIG_whisk_yarn_serviceName=openwhisk-action-service
+CONFIG_whisk_yarn_authType=simple
+
+CONFIG_whisk_yarn_queue=default
+CONFIG_whisk_yarn_memory=256
+CONFIG_whisk_yarn_cpus=1
+```
+
+## HA
+HA is supported. Each invoker will create its own YARN service with its
invoker number appended to the configured service name (e.g.
openwhisk-action-service-0).
+
+## Security
+By default, OpenWhisk does not authenticate when communicating with YARN.
Optionally, Kerberos/SPNEGO authentication can be used via JaaS with a few
steps:
+* Set whisk.yarn.authType to "kerberos"
+* Set the kerberosPrincipal and kerberosKeytabURI properties. These are used
by the YARN service.
+* Mount krb5.conf, login.conf, and keytab files into the invoker's docker
container. For example:
+ * -v "/etc/krb5.conf:/etc/krb5.conf"
+ * -v "/home/user1/login.conf:/login.conf"
+ * -v "/home/user1/user1.keytab:/user1.keytab"
+* Run the invoker with the following java settings (e.g. via the INVOKER_OPTS
environment variable):
+ * -Djava.security.auth.login.config={Path to login.conf file}
+ * -Djava.security.krb5.conf={Path to krb5.conf file}
+
+Example login.conf:
+```
+com.sun.security.jgss.initiate {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ doNotPrompt=true
+ keyTab="~/user1_hostA.keytab"
+ principal="user1/hostA@REALM";
+ };
+```
+
+## Known Issues
+
+* Logs are not collected from action containers.
+
+ For now, the YARN public URL will be included in the logs retrieved via the
wsk CLI. Once log retrieval from external sources is enabled, logs from yarn
containers would have to be routed to the external source, and then retrieved
from that source.
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/MockYARNRM.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/MockYARNRM.scala
new file mode 100644
index 0000000..1fcb799
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/MockYARNRM.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.yarn.test
+
+import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.atomic.AtomicLong
+
+import akka.http.scaladsl.model.DateTime
+import com.sun.net.httpserver.{HttpExchange, HttpServer}
+import org.apache.openwhisk.core.yarn.YARNJsonProtocol._
+import org.apache.openwhisk.core.yarn.{YARNResponseDefinition, _}
+import spray.json._
+
+import scala.collection.mutable
+import scala.util.Random
+
+//Mocks the Hadoop YARN Resource Manager. Only supports simple authentication
+class MockYARNRM(port: Int, delayMS: Int) {
+ val services: mutable.Map[String, ServiceDefinition] = mutable.Map[String,
ServiceDefinition]()
+ val initCompletionTimes: mutable.Map[String, DateTime] = mutable.Map[String,
DateTime]()
+ val flexCompletionTimes: mutable.Map[String, mutable.Map[String, DateTime]] =
+ mutable.Map[String, mutable.Map[String, DateTime]]()
+
+ private var server = HttpServer.create(new InetSocketAddress(port), -1)
+ val POST = "POST"
+ val GET = "GET"
+ val PUT = "PUT"
+ val DELETE = "DELETE"
+ private var container_instance_number = new AtomicLong(0)
+
+ this.server
+ .createContext(
+ "/app/v1/services",
+ (httpExchange: HttpExchange) => {
+ try {
+ if (getUserName(httpExchange).isEmpty) {
+ writeResponse(httpExchange, 403, "Username not provided")
+ } else {
+ val servicePattern = "/app/v1/services/([a-z-0-9]+)".r
+ val FlexUrlPattern =
"/app/v1/services/([a-z-0-9]+)/components/([a-z-0-9]+)".r
+ (httpExchange.getRequestMethod,
httpExchange.getRequestURI.getPath) match {
+ case (POST, "/app/v1/services") =>
+ val body: String =
scala.io.Source.fromInputStream(httpExchange.getRequestBody).mkString
+ val servDef = body.parseJson.convertTo[ServiceDefinition]
+
+ if (this.services.contains(servDef.name.get)) {
+ writeResponse(httpExchange, 400,
YARNResponseDefinition("Invalid request. Service already exists"))
+ } else {
+ this.services.put(servDef.name.get, servDef.copy(state =
Some("ACCEPTED")))
+ initCompletionTimes.put(servDef.name.get,
DateTime.now.plus(delayMS))
+ flexCompletionTimes.put(servDef.name.get,
mutable.Map[String, DateTime]())
+ writeResponse(httpExchange, 200,
YARNResponseDefinition("Creating Service"))
+ }
+
+ case (GET, servicePattern(serviceName)) =>
+ if (!this.services.contains(serviceName)) {
+ writeResponse(httpExchange, 404,
YARNResponseDefinition("Service not found"))
+ } else {
+ updateServDef(serviceName)
+
+ writeResponse(httpExchange, 200,
this.services(serviceName).toJson.compactPrint)
+ }
+
+ case (PUT, servicePattern(serviceName)) =>
+ val body: String =
scala.io.Source.fromInputStream(httpExchange.getRequestBody).mkString
+ val incomingComponentDef =
body.parseJson.convertTo[ServiceDefinition].components.head
+ if (!this.services.contains(serviceName)) {
+ writeResponse(httpExchange, 404,
YARNResponseDefinition("Service not found"))
+ } else if (!this
+ .services(serviceName)
+ .components
+ .exists(c =>
c.name.equals(incomingComponentDef.name))) {
+ writeResponse(httpExchange, 404,
YARNResponseDefinition("Component not found"))
+ } else {
+ val containerToRemove =
incomingComponentDef.decommissioned_instances.head
+
+ val serviceDef = this.services(serviceName)
+ val componentDef = serviceDef.components.find(c =>
c.name.equals(incomingComponentDef.name)).get
+ val originalSize = componentDef.number_of_containers.get
+ var containerList =
componentDef.containers.getOrElse(List[ContainerDefinition]())
+ containerList = containerList.filterNot(c =>
c.component_instance_name.equals(containerToRemove))
+
+ val newComponentDef =
+ componentDef.copy(number_of_containers = Some(originalSize
- 1), containers = Option(containerList))
+
+ val partialComponentList = serviceDef.components.filter(c =>
!c.name.equals(componentDef.name))
+
+ this.services.put(serviceName, serviceDef.copy(components =
partialComponentList :+ newComponentDef))
+
+ writeResponse(
+ httpExchange,
+ 200,
+ YARNResponseDefinition(s"Service $serviceName has
successfully decommissioned instances."))
+ }
+ case (DELETE, servicePattern(serviceName)) =>
+ if (!this.services.contains(serviceName)) {
+ writeResponse(httpExchange, 404,
YARNResponseDefinition("Service not found"))
+ } else {
+ this.services.remove(serviceName)
+ this.initCompletionTimes.remove(serviceName)
+ this.flexCompletionTimes.remove(serviceName)
+ writeResponse(httpExchange, 200,
YARNResponseDefinition("Service deleted"))
+ }
+
+ case (PUT, FlexUrlPattern(serviceName, componentName)) =>
+ val serviceDef = this.services.get(serviceName).orNull
+ val body: String =
scala.io.Source.fromInputStream(httpExchange.getRequestBody).mkString
+ val newSize = body.parseJson.asJsObject.fields.find(field =>
field._1.equals("number_of_containers"))
+ if (serviceDef == null ||
!flexCompletionTimes.contains(serviceName)) {
+ writeResponse(httpExchange, 404,
YARNResponseDefinition("Service not found"))
+ } else if (newSize.isEmpty) {
+ writeResponse(
+ httpExchange,
+ 400,
+ YARNResponseDefinition("Invalid request.
number_of_containers not specified"))
+ } else {
+ val newSizeInt: Int =
newSize.get._2.asInstanceOf[JsNumber].value.toInt
+ val componentDef = serviceDef.components.find(c =>
c.name.equals(componentName))
+
+ if (componentDef.isEmpty) {
+ writeResponse(
+ httpExchange,
+ 400,
+ YARNResponseDefinition("Invalid request. Component does
not exist"))
+ } else {
+ val originalSize =
componentDef.get.number_of_containers.get
+
+ var containerList =
componentDef.get.containers.getOrElse(List[ContainerDefinition]())
+ if (originalSize < newSizeInt) {
+ containerList = containerList :+ ContainerDefinition(
+ Some("127.0.0.1"),
+ Option(""),
+ componentName + "-" +
container_instance_number.getAndIncrement(),
+ Option(""),
+ Random.alphanumeric.take(10).mkString,
+ 0,
+ "INIT")
+
flexCompletionTimes.get(serviceName).orNull.put(componentName,
DateTime.now.plus(delayMS))
+ } else {
+ containerList = containerList.init
+ }
+
+ val newComponentDef =
+ componentDef.get.copy(number_of_containers =
Some(newSizeInt), containers = Option(containerList))
+
+ val partialComponentList = serviceDef.components.filter(c
=> !c.name.equals(componentName))
+
+ this.services
+ .put(serviceName, serviceDef.copy(components =
partialComponentList :+ newComponentDef))
+
+ writeResponse(
+ httpExchange,
+ 200,
+ YARNResponseDefinition(
+ "Updating component (" + componentName + ") size from
" + originalSize + " to " + newSizeInt))
+ }
+ }
+
+ case (_, _) =>
+ writeResponse(httpExchange, 404,
YARNResponseDefinition("Invalid request"))
+ }
+ }
+ } catch {
+ case exception: Throwable =>
+ writeResponse(httpExchange, 500, YARNResponseDefinition("Unknown
error: " + exception.getMessage))
+ }
+ })
+ this.server.setExecutor(null) // creates a default executor
+
+ def start() {
+ this.server.start()
+ }
+ def stop(): Unit = {
+ this.server.stop(0)
+ }
+ //updates component and service states based on completion-time maps
+ def updateServDef(serviceName: String): Unit = {
+
+ var tempServiceDef = this.services.get(serviceName).orNull
+
+ if (tempServiceDef == null)
+ throw new IllegalArgumentException("Invalid serviceName: " + serviceName)
+
+ if (this.initCompletionTimes(serviceName) < DateTime.now)
+ tempServiceDef = tempServiceDef.copy(state = Some("STABLE"))
+
+ val updatedComponents = tempServiceDef.components.map(comp => {
+ val updatedContainers = comp.containers
+ .getOrElse(List[ContainerDefinition]())
+ .map(container => {
+ if (container.state.equals("INIT") && this.flexCompletionTimes
+ .getOrElse(serviceName, mutable.Map[String, DateTime]())
+ .getOrElse(comp.name, DateTime.MinValue) < DateTime.now) {
+ val newContainer = container.copy(state = "READY")
+ newContainer
+ } else
+ container
+ })
+ comp.copy(containers = Option(updatedContainers))
+ })
+ this.services.put(serviceName, tempServiceDef.copy(components =
updatedComponents))
+ }
+ //Gets username from query string
+ private def getUserName(httpExchange: HttpExchange): String = {
+ val query = httpExchange.getRequestURI.getQuery
+
+ val props = new util.HashMap[String, String]
+ query
+ .split("&")
+ .foreach(param => {
+ val entry = param.split("=")
+ if (entry.length > 1)
+ props.put(entry(0), entry(1))
+ else
+ props.put(entry(0), "")
+ })
+ props.get("user.name")
+ }
+
+ private def writeResponse(t: HttpExchange, code: Int, content:
YARNResponseDefinition): Unit = {
+ writeResponse(t, code, content.toJson.compactPrint)
+ }
+ private def writeResponse(t: HttpExchange, code: Int, content: String): Unit
= {
+ val bytes = content.getBytes(StandardCharsets.UTF_8)
+ t.sendResponseHeaders(code, bytes.length)
+ val os = t.getResponseBody
+ os.write(bytes)
+ os.close()
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
new file mode 100644
index 0000000..26f9988
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.yarn.test
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.DateTime
+import org.apache.openwhisk.common.{PrintStreamLogging, TransactionId}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig._
+import org.apache.openwhisk.core.containerpool.ContainerArgsConfig
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest,
InvokerInstanceId, SizeUnits}
+import org.apache.openwhisk.core.yarn.{YARNConfig, YARNContainerFactory,
YARNRESTUtil, YARNTask}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FlatSpecLike, Suite}
+
+import scala.collection.immutable.Map
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class YARNContainerFactoryTests extends Suite with BeforeAndAfter with
FlatSpecLike {
+
+ val images = Array(
+ ImageName("nodejs6action", Option("openwhisk"), Option("latest")),
+ ImageName("python3action", Option("openwhisk"), Option("latest")))
+
+ val runtimes: String = "{\"runtimes\":{" +
+
"\"nodejs\":[{\"kind\":\"nodejs:6\",\"image\":{\"prefix\":\"openwhisk\",\"name\":\"nodejs6action\",\"tag\":\"latest\"}}],"
+
+
"\"python\":[{\"kind\":\"python:3\",\"image\":{\"prefix\":\"openwhisk\",\"name\":\"python3action\",\"tag\":\"latest\"}}]"
+
+ "}}"
+
+ implicit val logging: PrintStreamLogging = new PrintStreamLogging()
+ implicit val whiskConfig: WhiskConfig = new WhiskConfig(
+ Map(wskApiHostname -> "apihost", runtimesManifest -> runtimes) ++
wskApiHost)
+
+ val containerArgsConfig =
+ new ContainerArgsConfig(
+ "net1",
+ Seq("dns1", "dns2"),
+ Seq.empty,
+ Seq.empty,
+ Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
+
+ val yarnConfig =
+ YARNConfig(
+ "http://localhost:8088",
+ yarnLinkLogMessage = true,
+ "openwhisk-action-service",
+ YARNRESTUtil.SIMPLEAUTH,
+ "",
+ "",
+ "default",
+ "256",
+ 1)
+ val instance0 = new InvokerInstanceId(0, Some("invoker0"), Some("invoker0"),
ByteSize(0, SizeUnits.BYTE))
+ val instance1 = new InvokerInstanceId(1, Some("invoker1"), Some("invoker1"),
ByteSize(0, SizeUnits.BYTE))
+ val serviceName0 = yarnConfig.serviceName + "-0"
+ val serviceName1 = yarnConfig.serviceName + "-1"
+
+ //System.setProperty("java.security.auth.login.config", "~/login.conf")
+ //System.setProperty("java.security.krb5.conf", "/etc/krb5.conf")
+
+ val properties: Map[String, Set[String]] = Map[String, Set[String]]()
+ ExecManifest.initialize(whiskConfig)
+
+ behavior of "YARNContainerFactory"
+
+ it should "initalize correctly with zero containers" in {
+
+ val rm = new MockYARNRM(8088, 1000)
+ rm.start()
+ val factory =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance0,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory.init()
+
+ //Service was created
+ assert(rm.services.contains(serviceName0))
+
+ //Factory waited until service was stable
+ assert(rm.initCompletionTimes.getOrElse(serviceName0, DateTime.MaxValue) <
DateTime.now)
+
+ //No containers were created
+ assert(rm.flexCompletionTimes.getOrElse(serviceName0, Map[String,
DateTime]()).isEmpty)
+
+ val componentNamesInService =
rm.services.get(serviceName0).orNull.components.map(c => c.name)
+ val missingImages = images
+ .map(e => e.name)
+ .filter(imageName => !componentNamesInService.contains(imageName))
+
+ //All images types were created
+ assert(missingImages.isEmpty)
+
+ //All components have zero containers
+ assert(rm.services.get(serviceName0).orNull.components.forall(c =>
c.number_of_containers.get == 0))
+
+ rm.stop()
+ }
+
+ it should "create a container" in {
+ val rm = new MockYARNRM(8088, 1000)
+ rm.start()
+ val factory =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance0,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory.init()
+
+ val imageToCreate = images(0)
+ val containerFuture = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ imageToCreate,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ Await.result(containerFuture, 60.seconds)
+
+ //Container of the correct type was created
+ assert(rm.services.contains(serviceName0))
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToCreate.name))
+ .orNull != null)
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToCreate.name))
+ .orNull
+ .number_of_containers
+ .get == 1)
+
+ //Factory waited for container to be stable
+ assert(
+ rm.flexCompletionTimes
+ .getOrElse(serviceName0, Map[String, DateTime]())
+ .getOrElse(imageToCreate.name, DateTime.MaxValue) < DateTime.now)
+
+ rm.stop()
+ }
+
+ it should "destroy the correct container" in {
+ val rm = new MockYARNRM(8088, 1000)
+ rm.start()
+ val factory =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance0,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory.init()
+
+ val imageToDelete = images(0)
+ val imageNotToDelete = images(1)
+
+ val containerFuture1 = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ imageNotToDelete,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ val containerFuture2 = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ imageToDelete,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ val containerFuture3 = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ imageToDelete,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ val containerFuture4 = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ imageToDelete,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ val container1 = Await.result(containerFuture1, 30.seconds)
+ val container2 = Await.result(containerFuture2, 30.seconds)
+ val container3 = Await.result(containerFuture3, 30.seconds)
+ val container4 = Await.result(containerFuture4, 30.seconds)
+
+ //Ensure container was created
+ val containerToRemoveName =
container2.asInstanceOf[YARNTask].component_instance_name
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToDelete.name))
+ .orNull
+ .containers
+ .get
+ .map(c => c.component_instance_name)
+ .contains(containerToRemoveName))
+
+ val destroyFuture = container2.destroy()(TransactionId.testing)
+ Await.result(destroyFuture, 30.seconds)
+
+ //Ensure container of the correct type was deleted
+ assert(rm.services.contains(serviceName0))
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageNotToDelete.name))
+ .orNull != null)
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageNotToDelete.name))
+ .orNull
+ .number_of_containers
+ .get == 1)
+
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToDelete.name))
+ .orNull != null)
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToDelete.name))
+ .orNull
+ .number_of_containers
+ .get == 2)
+
+ assert(
+ !rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToDelete.name))
+ .orNull
+ .containers
+ .get
+ .map(c => c.component_instance_name)
+ .contains(containerToRemoveName))
+
+ rm.stop()
+ }
+ it should "create and destroy multiple containers" in {
+ val rm = new MockYARNRM(8088, 1000)
+ rm.start()
+ val factory =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance0,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory.init()
+
+ val container1Future = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ images(0),
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ val container2Future = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ images(1),
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ val container3Future = factory.createContainer(
+ TransactionId.testing,
+ "name",
+ images(0),
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ Await.result(container1Future, 30.seconds)
+ val container2 = Await.result(container2Future, 30.seconds)
+ val container3 = Await.result(container3Future, 30.seconds)
+
+ val destroyFuture1 = container2.destroy()(TransactionId.testing)
+ Await.result(destroyFuture1, 30.seconds)
+
+ val destroyFuture2 = container3.destroy()(TransactionId.testing)
+ Await.result(destroyFuture2, 30.seconds)
+
+ //Containers of the correct type was deleted
+ assert(rm.services.contains(serviceName0))
+ assert(rm.services.get(serviceName0).orNull.components.find(c =>
c.name.equals(images(1).name)).orNull != null)
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(images(1).name))
+ .orNull
+ .number_of_containers
+ .get == 0)
+
+ assert(rm.services.get(serviceName0).orNull.components.find(c =>
c.name.equals(images(0).name)).orNull != null)
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(images(0).name))
+ .orNull
+ .number_of_containers
+ .get == 1)
+
+ //Factory waited for container to be stable
+ assert(
+ rm.flexCompletionTimes
+ .getOrElse(serviceName0, Map[String, DateTime]())
+ .getOrElse(images(0).name, DateTime.MaxValue) < DateTime.now)
+
+ rm.stop()
+ }
+ it should "cleanup" in {
+ val rm = new MockYARNRM(8088, 1000)
+ rm.start()
+ val factory =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance0,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory.init()
+ factory.cleanup()
+
+ //Service was destroyed
+ assert(!rm.services.contains(serviceName0))
+ assert(!rm.initCompletionTimes.contains(serviceName0))
+ assert(!rm.flexCompletionTimes.contains(serviceName0))
+
+ rm.stop()
+ }
+
+ it should "support HA" in {
+ val rm = new MockYARNRM(8088, 1000)
+ rm.start()
+ val factory0 =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance0,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory0.init()
+ val factory1 =
+ new YARNContainerFactory(
+ ActorSystem(),
+ logging,
+ whiskConfig,
+ instance1,
+ properties,
+ containerArgsConfig,
+ yarnConfig)
+ factory1.init()
+
+ val imageToCreate = images(0)
+ val containerFuture0 = factory0.createContainer(
+ TransactionId.testing,
+ "name",
+ imageToCreate,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+ val containerFuture1 = factory1.createContainer(
+ TransactionId.testing,
+ "name",
+ imageToCreate,
+ unuseduserProvidedImage = true,
+ ByteSize(256, SizeUnits.MB),
+ 1)
+
+ Await.result(containerFuture0, 60.seconds)
+ Await.result(containerFuture1, 60.seconds)
+
+ //Container of the correct type was created for each invoker instance
+ assert(rm.services.contains(serviceName0))
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToCreate.name))
+ .orNull != null)
+ assert(
+ rm.services
+ .get(serviceName0)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToCreate.name))
+ .orNull
+ .number_of_containers
+ .get == 1)
+
+ assert(rm.services.contains(serviceName1))
+ assert(
+ rm.services
+ .get(serviceName1)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToCreate.name))
+ .orNull != null)
+ assert(
+ rm.services
+ .get(serviceName1)
+ .orNull
+ .components
+ .find(c => c.name.equals(imageToCreate.name))
+ .orNull
+ .number_of_containers
+ .get == 1)
+
+ //Both factories waited for container to be stable
+ assert(
+ rm.flexCompletionTimes
+ .getOrElse(serviceName0, Map[String, DateTime]())
+ .getOrElse(imageToCreate.name, DateTime.MaxValue) < DateTime.now)
+
+ assert(
+ rm.flexCompletionTimes
+ .getOrElse(serviceName1, Map[String, DateTime]())
+ .getOrElse(imageToCreate.name, DateTime.MaxValue) < DateTime.now)
+
+ rm.stop()
+ }
+}