This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new fbe85b1 Adds a leaner OpenWhisk configuration. (#4216)
fbe85b1 is described below
commit fbe85b1a06e4cabbe47b487eabbce95e3d1d5f3d
Author: Pavel Kravchenko <[email protected]>
AuthorDate: Sat Feb 9 16:12:22 2019 +0200
Adds a leaner OpenWhisk configuration. (#4216)
This commit introduces a leaner configuration of the deployment that
eschews kafka & zookeeper. It merges the controller and invoker together and
uses an in-memory bus to communicate between the controller and invoker. The
leaner configuration reduces the amount of memory needed by a deploymen.
---
ansible/README.md | 11 +
ansible/group_vars/all | 1 +
ansible/openwhisk.yml | 3 +
ansible/roles/controller/tasks/deploy.yml | 9 +
ansible/roles/controller/tasks/lean.yml | 40 +++
ansible/roles/invoker/tasks/deploy.yml | 48 ++--
.../openwhisk/connector/lean/LeanConsumer.scala | 49 ++++
.../connector/lean/LeanMessagingProvider.scala | 69 +++++
.../openwhisk/connector/lean/LeanProducer.scala | 60 +++++
core/controller/Dockerfile | 23 +-
core/controller/build.gradle | 1 +
.../core/loadBalancer/CommonLoadBalancer.scala | 282 ++++++++++++++++++++
.../openwhisk/core/loadBalancer/LeanBalancer.scala | 104 ++++++++
.../openwhisk/core/loadBalancer/LoadBalancer.scala | 26 +-
.../ShardingContainerPoolBalancer.scala | 289 +++------------------
tests/build.gradle | 37 ++-
tests/performance/preparation/deploy-lean.sh | 38 +++
.../test/ShardingContainerPoolBalancerTests.scala | 8 +-
tools/travis/runLeanSystemTests.sh | 36 +++
tools/travis/setupLeanSystem.sh | 33 +++
tools/vagrant/README.md | 9 +
tools/vagrant/Vagrantfile | 11 +-
22 files changed, 891 insertions(+), 296 deletions(-)
diff --git a/ansible/README.md b/ansible/README.md
index eeca95f..13efbe9 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -280,6 +280,17 @@ This is usually not necessary, however in case you want to
uninstall all prereqs
ansible-playbook -i environments/<environment> prereq.yml -e mode=clean
```
+### Lean Setup
+To have a lean setup (no Kafka, Zookeeper and no Invokers as separate
entities):
+
+At [Deploying Using CouchDB](ansible/README.md#deploying-using-cloudant) step,
replace:
+```
+ansible-playbook -i environments/<environment> openwhisk.yml
+```
+by:
+```
+ansible-playbook -i environments/<environment> openwhisk.yml -e lean=true
+```
### Troubleshooting
Some of the more common problems and their solution are listed here.
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 3e4b3ea..bdb9a70 100755
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -2,6 +2,7 @@
# license agreements; and to You under the Apache License, Version 2.0.
mode: deploy
+lean: false
prompt_user: true
openwhisk_home: "{{ lookup('env', 'OPENWHISK_HOME') | default(playbook_dir ~
'/..', true) }}"
openwhisk_cli_home: "{{ lookup('env', 'OPENWHISK_CLI') |
default(openwhisk_home ~ '/../incubator-openwhisk-cli', true) }}"
diff --git a/ansible/openwhisk.yml b/ansible/openwhisk.yml
index 11111e1..5f6583b 100644
--- a/ansible/openwhisk.yml
+++ b/ansible/openwhisk.yml
@@ -6,11 +6,14 @@
# playbook (currently cloudant.yml or couchdb.yml).
# It assumes that wipe.yml have being deployed at least once.
+
- import_playbook: kafka.yml
+ when: not lean
- import_playbook: controller.yml
- import_playbook: invoker.yml
+ when: not lean
- import_playbook: edge.yml
diff --git a/ansible/roles/controller/tasks/deploy.yml
b/ansible/roles/controller/tasks/deploy.yml
index e13b6f9..7594e28 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -283,6 +283,11 @@
- name: include plugins
include_tasks: "{{ item }}.yml"
with_items: "{{ controller_plugins | default([]) }}"
+ when: not lean
+
+- name: lean controller setup
+ include_tasks: "lean.yml"
+ when: lean
- name: (re)start controller
docker_container:
@@ -296,6 +301,10 @@
env: "{{ env }}"
volumes: "{{ controller_volumes }}"
ports: "{{ ports_to_expose }}"
+ # userns_mode, pid_mode and privileged required when controller running in
lean mode
+ userns_mode: "{{ userns_mode | default('') }}"
+ pid_mode: "{{ pid_mode | default('') }}"
+ privileged: "{{ privileged | default('no') }}"
command:
/bin/sh -c
"exec /init.sh {{ controller_index }}
diff --git a/ansible/roles/controller/tasks/lean.yml
b/ansible/roles/controller/tasks/lean.yml
new file mode 100644
index 0000000..809f13d
--- /dev/null
+++ b/ansible/roles/controller/tasks/lean.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+---
+# This plugin will provide controller with Lean Controller parameters
+
+- name: set inventory_hostname to invoker and save controllers data that can
be changed by invoker task
+ set_fact:
+ controller_env: "{{ env }}"
+ inventory_hostname: "invoker0"
+ invoker_index_base: 0
+ name_prefix: "invoker"
+ host_group: "{{ groups['invokers'] }}"
+
+- name: include invoker data
+ include_tasks: "../invoker/tasks/deploy.yml"
+
+- name: save invoker volumes
+ set_fact:
+ invoker_volumes: "{{ volumes.split(',') | reject('search','/logs') |
reject('search','/conf') | reject('search','/coverage') | list }}"
+
+- name: populate volumes
+ set_fact:
+ controller_volumes: >-
+ {{ invoker_volumes }} +
+ {{ controller_volumes }}
+
+- name: populate environment variables for LEAN controller
+ vars:
+ lean_env:
+ "CONFIG_whisk_spi_MessagingProvider":
"org.apache.openwhisk.connector.lean.LeanMessagingProvider"
+ "CONFIG_whisk_spi_LoadBalancerProvider":
"org.apache.openwhisk.core.loadBalancer.LeanBalancer"
+ set_fact:
+ env: "{{ env | combine(controller_env) | combine(lean_env) }}"
+
+- name: provide extended docker container params for controller
+ set_fact:
+ userns_mode: "host"
+ pid_mode: "host"
+ privileged: "yes"
+
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index 9e37b03..2339110 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -26,25 +26,29 @@
# must include a trailing '/'.
#
- name: "pull runtime action images per manifest"
- shell: "docker pull {{runtimes_registry |
default()}}{{item.prefix}}/{{item.name}}:{{item.tag | default()}}"
+ shell: "docker pull {{runtimes_registry |
default()}}{{inv_item.prefix}}/{{inv_item.name}}:{{inv_item.tag | default()}}"
loop: "{{ runtimesManifest.runtimes.values() | sum(start=[]) |
selectattr('deprecated', 'equalto',false) | map(attribute='image') | list |
unique }}"
when: skip_pull_runtimes is not defined or not (skip_pull_runtimes == True
or skip_pull_runtimes.lower() == "true")
register: result
until: (result.rc == 0)
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
+ loop_control:
+ loop_var: inv_item
###
# See comment above for pulling other runtime images.
#
- name: "pull blackboxes action images per manifest"
- shell: "docker pull {{runtimes_registry |
default()}}{{item.prefix}}/{{item.name}}:{{item.tag | default()}}"
+ shell: "docker pull {{runtimes_registry |
default()}}{{inv_item.prefix}}/{{inv_item.name}}:{{inv_item.tag | default()}}"
loop: "{{ runtimesManifest.blackboxes }}"
when: skip_pull_runtimes is not defined or not (skip_pull_runtimes == True
or skip_pull_runtimes.lower() == "true")
register: result
until: (result.rc == 0)
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
+ loop_control:
+ loop_var: inv_item
- name: "determine docker root dir on docker-machine"
uri: url="http://{{ ansible_host }}:{{ docker.port }}/info"
return_content=yes
@@ -89,24 +93,28 @@
- name: copy keystore, key and cert
when: invoker.protocol == "https"
copy:
- src: "{{ item }}"
+ src: "{{ inv_item }}"
mode: 0666
dest: "{{ invoker.confdir }}/{{ invoker_name }}"
become: "{{ invoker.dir.become }}"
with_items:
- - "files/{{ invoker.ssl.keystore.name }}"
- - "files/{{ invoker.ssl.key }}"
- - "files/{{ invoker.ssl.cert }}"
+ - "{{ openwhisk_home }}/ansible/roles/invoker/files/{{
invoker.ssl.keystore.name }}"
+ - "{{ openwhisk_home }}/ansible/roles/invoker/files/{{ invoker.ssl.key }}"
+ - "{{ openwhisk_home }}/ansible/roles/invoker/files/{{ invoker.ssl.cert }}"
+ loop_control:
+ loop_var: inv_item
- name: check, that required databases exist
include_tasks: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
vars:
- dbName: "{{ item }}"
+ dbName: "{{ inv_item }}"
dbUser: "{{ db.credentials.invoker.user }}"
dbPass: "{{ db.credentials.invoker.pass }}"
with_items:
- "{{ db.whisk.actions }}"
- "{{ db.whisk.activations }}"
+ loop_control:
+ loop_var: inv_item
- name: get running invoker information
uri: url="http://{{ ansible_host }}:{{ docker.port
}}/containers/json?filters={{ '{"name":[ "invoker" ],"ancestor":[ "invoker" ]}'
| urlencode }}" return_content=yes
@@ -141,9 +149,11 @@
- name: determine if index of invoker is same with index of inventory host
fail:
- msg: "invoker index is invalid. expected: /invoker{{
groups['invokers'].index(inventory_hostname) }} found: {{ item.Names[0] }}"
+ msg: "invoker index is invalid. expected: /invoker{{
groups['invokers'].index(inventory_hostname) }} found: {{ inv_item.Names[0] }}"
with_items: "{{ invokerInfo }}"
- when: not invoker.allowMultipleInstances and item.Names[0] != "/{{
invoker_name }}"
+ when: not invoker.allowMultipleInstances and inv_item.Names[0] != "/{{
invoker_name }}"
+ loop_control:
+ loop_var: inv_item
- name: copy jmxremote password file
when: jmx.enabled
@@ -166,12 +176,12 @@
- name: prepare invoker ports
set_fact:
- ports_to_expose: ["{{ invoker.port + (invoker_index | int) }}:8080"]
+ invoker_ports_to_expose: ["{{ invoker.port + (invoker_index | int)
}}:8080"]
- name: expose additional ports if jmxremote is enabled
when: jmx.enabled
set_fact:
- ports_to_expose: "{{ ports_to_expose }} + [ \"{{ jmx.basePortInvoker +
(invoker_index | int) }}:{{ jmx.basePortInvoker + (invoker_index | int) }}\" ]
+ [ \"{{ jmx.rmiBasePortInvoker + (invoker_index | int) }}:{{
jmx.rmiBasePortInvoker + (invoker_index | int) }}\" ]"
+ invoker_ports_to_expose: "{{ invoker_ports_to_expose }} + [ \"{{
jmx.basePortInvoker + (invoker_index | int) }}:{{ jmx.basePortInvoker +
(invoker_index | int) }}\" ] + [ \"{{ jmx.rmiBasePortInvoker + (invoker_index |
int) }}:{{ jmx.rmiBasePortInvoker + (invoker_index | int) }}\" ]"
- name: populate environment variables for invoker
set_fact:
@@ -249,16 +259,20 @@
- name: extend invoker dns env
set_fact:
- env: "{{ env | default({}) | combine(
{'CONFIG_whisk_containerFactory_containerArgs_dnsServers_' ~ item.0: item.1} )
}}"
+ env: "{{ env | default({}) | combine(
{'CONFIG_whisk_containerFactory_containerArgs_dnsServers_' ~ inv_item.0:
inv_item.1} ) }}"
with_indexed_items: "{{ (invoker_container_network_dns_servers |
default()).split(' ')}}"
+ loop_control:
+ loop_var: inv_item
- name: merge extra env variables
set_fact:
env: "{{ env | combine(invoker.extraEnv) }}"
- name: include plugins
- include_tasks: "{{ item }}.yml"
+ include_tasks: "{{ inv_item }}.yml"
with_items: "{{ invoker_plugins | default([]) }}"
+ loop_control:
+ loop_var: inv_item
- name: set invoker volumes
set_fact:
@@ -285,7 +299,7 @@
- name: ensure invoker coverage directory is created with permissions
file:
- path: "{{ coverage_logs_dir }}/invoker/{{ item }}"
+ path: "{{ coverage_logs_dir }}/invoker/{{ inv_item }}"
state: directory
mode: 0777
with_items:
@@ -293,6 +307,8 @@
- common
become: "{{ logs.dir.become }}"
when: coverage_enabled
+ loop_control:
+ loop_var: inv_item
- name: extend invoker volume for coverage
set_fact:
@@ -312,8 +328,9 @@
recreate: true
env: "{{ env }}"
volumes: "{{ volumes }}"
- ports: "{{ ports_to_expose }}"
+ ports: "{{ invoker_ports_to_expose }}"
command: /bin/sh -c "exec /init.sh --id {{ invoker_index }} --uniqueName
{{ invoker_index }} >> /logs/{{ invoker_name }}_logs.log 2>&1"
+ when: not lean
- name: wait until Invoker is up and running
uri:
@@ -325,3 +342,4 @@
until: result.status == 200
retries: 12
delay: 5
+ when: not lean
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala
new file mode 100644
index 0000000..98b1ace
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.lean
+
+import scala.concurrent.duration._
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.MessageConsumer
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.TimeUnit
+
+class LeanConsumer(queue: BlockingQueue[Array[Byte]], override val maxPeek:
Int)(implicit logging: Logging)
+ extends MessageConsumer {
+
+ /**
+ * Long poll for messages. Method returns once message available but no
later than given
+ * duration.
+ *
+ * @param duration the maximum duration for the long poll
+ */
+ override def peek(duration: FiniteDuration, retry: Int): Iterable[(String,
Int, Long, Array[Byte])] = {
+ Option(queue.poll(duration.toMillis, TimeUnit.MILLISECONDS))
+ .map(record => Iterable(("", 0, 0L, record)))
+ .getOrElse(Iterable.empty)
+ }
+
+ /**
+ * There's no cursor to advance since that's done in the poll above.
+ */
+ override def commit(retry: Int): Unit = { /*do nothing*/ }
+
+ override def close(): Unit = {
+ logging.info(this, s"closing lean consumer")
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala
new file mode 100644
index 0000000..2bcf271
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.lean
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.Map
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Success
+import scala.util.Try
+
+import akka.actor.ActorSystem
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.MessageConsumer
+import org.apache.openwhisk.core.connector.MessageProducer
+import org.apache.openwhisk.core.connector.MessagingProvider
+import org.apache.openwhisk.core.entity.ByteSize
+
+/**
+ * A simple implementation of MessagingProvider.
+ */
+object LeanMessagingProvider extends MessagingProvider {
+
+ /** Map to hold message queues, the key is the topic */
+ val queues: Map[String, BlockingQueue[Array[Byte]]] =
+ new TrieMap[String, BlockingQueue[Array[Byte]]]
+
+ def getConsumer(config: WhiskConfig, groupId: String, topic: String,
maxPeek: Int, maxPollInterval: FiniteDuration)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem): MessageConsumer = {
+
+ val queue = queues.getOrElseUpdate(topic, new
LinkedBlockingQueue[Array[Byte]]())
+
+ new LeanConsumer(queue, maxPeek)
+ }
+
+ def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize] =
None)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem): MessageProducer =
+ new LeanProducer(queues)
+
+ def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String,
maxMessageBytes: Option[ByteSize] = None)(
+ implicit logging: Logging): Try[Unit] = {
+ if (queues.contains(topic)) {
+ Success(logging.info(this, s"topic $topic already existed"))
+ } else {
+ queues.put(topic, new LinkedBlockingQueue[Array[Byte]]())
+ Success(logging.info(this, s"topic $topic created"))
+ }
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
new file mode 100644
index 0000000..e555e85
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.lean
+
+import akka.actor.ActorSystem
+import scala.concurrent.Future
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
+import org.apache.openwhisk.common.Counter
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.Message
+import org.apache.openwhisk.core.connector.MessageProducer
+
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import scala.collection.mutable.Map
+import java.nio.charset.StandardCharsets
+import scala.concurrent.ExecutionContext
+
+class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit
logging: Logging, actorSystem: ActorSystem)
+ extends MessageProducer {
+
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+
+ override def sentCount(): Long = sentCounter.cur
+
+ /** Sends msg to topic. This is an asynchronous operation. */
+ override def send(topic: String, msg: Message, retry: Int = 3):
Future[RecordMetadata] = {
+ implicit val transid = msg.transid
+
+ val queue = queues.getOrElseUpdate(topic, new
LinkedBlockingQueue[Array[Byte]]())
+
+ Future {
+ queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
+ sentCounter.next()
+ new RecordMetadata(new TopicPartition(topic, 0), -1, -1,
System.currentTimeMillis(), null, -1, -1)
+ }
+ }
+
+ /** Closes producer. */
+ override def close(): Unit = {
+ logging.info(this, "closing lean producer")
+ }
+
+ private val sentCounter = new Counter()
+}
diff --git a/core/controller/Dockerfile b/core/controller/Dockerfile
index 85950d9..b540124 100644
--- a/core/controller/Dockerfile
+++ b/core/controller/Dockerfile
@@ -8,6 +8,25 @@ ENV UID=1001 \
ENV
SWAGGER_UI_DOWNLOAD_SHA256=3d7ef5ddc59e10f132fe99771498f0f1ba7a2cbfb9585f9863d4191a574c96e7
\
SWAGGER_UI_VERSION=3.6.0
+###################################################################################################
+# It's needed for lean mode where the controller is also an invoker
+###################################################################################################
+ENV DOCKER_VERSION=1.12.0 \
+
DOCKER_DOWNLOAD_SHA256=3dd07f65ea4a7b4c8829f311ab0213bca9ac551b5b24706f3e79a97e22097f8b
+
+RUN apk add --update openssl
+
+# Uncomment to fetch latest version of docker instead: RUN wget -qO-
https://get.docker.com | sh
+# Install docker client
+RUN curl -sSL -o docker-${DOCKER_VERSION}.tgz
https://get.docker.com/builds/Linux/x86_64/docker-${DOCKER_VERSION}.tgz && \
+echo "${DOCKER_DOWNLOAD_SHA256} docker-${DOCKER_VERSION}.tgz" | sha256sum -c
- && \
+tar --strip-components 1 -xvzf docker-${DOCKER_VERSION}.tgz -C /usr/bin
docker/docker && \
+tar --strip-components 1 -xvzf docker-${DOCKER_VERSION}.tgz -C /usr/bin
docker/docker-runc && \
+rm -f docker-${DOCKER_VERSION}.tgz && \
+chmod +x /usr/bin/docker && \
+chmod +x /usr/bin/docker-runc
+##################################################################################################
+
# Install swagger-ui
RUN curl -sSL -o swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz --no-verbose
https://github.com/swagger-api/swagger-ui/archive/v${SWAGGER_UI_VERSION}.tar.gz
&& \
echo "${SWAGGER_UI_DOWNLOAD_SHA256}
swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz" | sha256sum -c - && \
@@ -23,7 +42,9 @@ COPY init.sh /
RUN chmod +x init.sh
RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash
${NOT_ROOT_USER}
-USER ${NOT_ROOT_USER}
+
+# It is possible to run as non root if you dont need invoker capabilities out
of the controller today
+#USER ${NOT_ROOT_USER}
EXPOSE 8080
CMD ["./init.sh", "0"]
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index 243c776..0059157 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -43,6 +43,7 @@ dependencies {
compile
'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.12:0.11.0'
compile
'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.12:0.11.0'
compile project(':common:scala')
+ compile project(':core:invoker')
scoverage gradle.scoverage.deps
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
new file mode 100644
index 0000000..197e3d1
--- /dev/null
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.loadBalancer
+
+import akka.actor.ActorRef
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.ActorSystem
+import akka.event.Logging.InfoLevel
+import akka.stream.ActorMaterializer
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import org.apache.openwhisk.common.LoggingMarkers._
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+/**
+ * Abstract class which provides common logic for all LoadBalancer
implementations.
+ */
+abstract class CommonLoadBalancer(config: WhiskConfig,
+ feedFactory: FeedFactory,
+ controllerInstance:
ControllerInstanceId)(implicit val actorSystem: ActorSystem,
+
logging: Logging,
+
materializer: ActorMaterializer,
+
messagingProvider: MessagingProvider)
+ extends LoadBalancer {
+
+ protected implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+
+ val lbConfig: ShardingContainerPoolBalancerConfig =
+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer)
+ protected val invokerPool: ActorRef
+
+ /** State related to invocations and throttling */
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
ActivationEntry]()
+ protected[loadBalancer] val activationPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+ protected val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+ protected val totalActivations = new LongAdder()
+ protected val totalBlackBoxActivationMemory = new LongAdder()
+ protected val totalManagedActivationMemory = new LongAdder()
+
+ protected def emitHistogramMetric() = {
+
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
totalActivations.longValue)
+ MetricEmitter.emitHistogramMetric(
+ LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
+ totalBlackBoxActivationMemory.longValue +
totalManagedActivationMemory.longValue)
+ MetricEmitter.emitHistogramMetric(
+ LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
+ totalBlackBoxActivationMemory.longValue)
+ MetricEmitter.emitHistogramMetric(
+ LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
+ totalManagedActivationMemory.longValue)
+ }
+
+ actorSystem.scheduler.schedule(0.seconds, 10.seconds)(emitHistogramMetric())
+
+ override def activeActivationsFor(namespace: UUID): Future[Int] =
+
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+ override def totalActiveActivations: Future[Int] =
Future.successful(totalActivations.intValue())
+
+ /**
+ * 2. Update local state with the activation to be executed scheduled.
+ *
+ * All activations are tracked in the activationSlots map. Additionally,
blocking invokes
+ * are tracked in the activation results map. When a result is received via
activeack, it
+ * will cause the result to be forwarded to the caller waiting on the
result, and cancel
+ * the DB poll which is also trying to do the same.
+ */
+ protected def setupActivation(msg: ActivationMessage,
+ action: ExecutableWhiskActionMetaData,
+ instance: InvokerInstanceId):
Future[Either[ActivationId, WhiskActivation]] = {
+
+ totalActivations.increment()
+ val isBlackboxInvocation = action.exec.pull
+ val totalActivationMemory =
+ if (isBlackboxInvocation) totalBlackBoxActivationMemory else
totalManagedActivationMemory
+ totalActivationMemory.add(action.limits.memory.megabytes)
+
+ activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
+
+ // Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
+ // value for action durations to avoid too tight timeouts.
+ // Timeouts in general are diluted by a configurable factor. In essence
this factor controls how much slack you want
+ // to allow in your topics before you start reporting failed activations.
+ val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION)
* lbConfig.timeoutFactor) + 1.minute
+
+ val resultPromise = if (msg.blocking) {
+ activationPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else Future.successful(Left(msg.activationId))
+
+ // Install a timeout handler for the catastrophic case where an active ack
is not received at all
+ // (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
+ // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
+ // in this case, if the activation handler is still registered, remove it
and update the books.
+ activationSlots.getOrElseUpdate(
+ msg.activationId, {
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false, invoker = instance)
+ }
+
+ // please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
+ ActivationEntry(
+ msg.activationId,
+ msg.user.namespace.uuid,
+ instance,
+ action.limits.memory.megabytes.MB,
+ action.limits.concurrency.maxConcurrent,
+ action.fullyQualifiedName(true),
+ timeoutHandler,
+ isBlackboxInvocation)
+ })
+
+ resultPromise
+ }
+
+ protected val messageProducer =
+ messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+ /** 3. Send the activation to the invoker */
+ protected def sendActivationToInvoker(producer: MessageProducer,
+ msg: ActivationMessage,
+ invoker: InvokerInstanceId):
Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+ val topic = s"invoker${invoker.toInt}"
+
+
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+ val start = transid.started(
+ this,
+ LoggingMarkers.CONTROLLER_KAFKA,
+ s"posting topic '$topic' with activation id '${msg.activationId}'",
+ logLevel = InfoLevel)
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted to
${status.topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) => transid.failed(this, start, s"error on posting to
topic $topic")
+ }
+ }
+
+ /**
+ * Subscribes to active acks (completion messages from the invokers), and
+ * registers a handler for received active acks from invokers.
+ */
+ private val activationFeed: ActorRef =
+ feedFactory.createFeed(actorSystem, messagingProvider,
processAcknowledgement)
+
+ /** 4. Get the active-ack message and parse it */
+ protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
+ AcknowledegmentMessage.parse(raw) match {
+ case Success(m: CompletionMessage) =>
+ processCompletion(
+ m.activationId,
+ m.transid,
+ forced = false,
+ isSystemError = m.isSystemError,
+ invoker = m.invoker)
+ activationFeed ! MessageFeed.Processed
+
+ case Success(m: ResultMessage) =>
+ processResult(m.response, m.transid)
+ activationFeed ! MessageFeed.Processed
+
+ case Failure(t) =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw")
+
+ case _ =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
+ }
+ }
+
+ /** 5. Process the result ack and return it to the user */
+ protected def processResult(response: Either[ActivationId, WhiskActivation],
tid: TransactionId): Unit = {
+ val aid = response.fold(l => l, r => r.activationId)
+
+ // Resolve the promise to send the result back to the user.
+ // The activation will be removed from the activation slots later, when
the completion message
+ // is received (because the slot in the invoker is not yet free for new
activations).
+ activationPromises.remove(aid).foreach(_.trySuccess(response))
+ logging.info(this, s"received result ack for '$aid'")(tid)
+ }
+
+ protected def releaseInvoker(invoker: InvokerInstanceId, entry:
ActivationEntry)
+
+ /** 6. Process the completion ack and update the state */
+ protected[loadBalancer] def processCompletion(aid: ActivationId,
+ tid: TransactionId,
+ forced: Boolean,
+ isSystemError: Boolean,
+ invoker: InvokerInstanceId):
Unit = {
+
+ val invocationResult = if (forced) {
+ InvocationFinishedResult.Timeout
+ } else {
+ // If the response contains a system error, report that, otherwise
report Success
+ // Left generally is considered a Success, since that could be a message
not fitting into Kafka
+ if (isSystemError) {
+ InvocationFinishedResult.SystemError
+ } else {
+ InvocationFinishedResult.Success
+ }
+ }
+
+ activationSlots.remove(aid) match {
+ case Some(entry) =>
+ totalActivations.decrement()
+ val totalActivationMemory =
+ if (entry.isBlackbox) totalBlackBoxActivationMemory else
totalManagedActivationMemory
+ totalActivationMemory.add(entry.memory.toMB * (-1))
+ activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+
+ releaseInvoker(invoker, entry)
+
+ if (!forced) {
+ entry.timeoutHandler.cancel()
+ // notice here that the activationPromises is not touched, because
the expectation is that
+ // the active ack is received as expected, and processing that
message removed the promise
+ // from the corresponding map
+ } else {
+ // the entry has timed out; if the active ack is still around,
remove its entry also
+ // and complete the promise with a failure if necessary
+ activationPromises
+ .remove(aid)
+ .foreach(_.tryFailure(new Throwable("no completion or active ack
received yet")))
+ }
+
+ logging.info(this, s"${if (!forced) "received" else "forced"}
completion ack for '$aid'")(tid)
+ // Active acks that are received here are strictly from user actions -
health actions are not part of
+ // the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
+ // guard this
+ invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+ case None if tid == TransactionId.invokerHealth =>
+ // Health actions do not have an ActivationEntry as they are written
on the message bus directly. Their result
+ // is important to pass to the invokerPool because they are used to
determine if the invoker can be considered
+ // healthy again.
+ logging.info(this, s"received completion ack for health action on
$invoker")(tid)
+ // guard this
+ invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+ case None if !forced =>
+ // Received an active-ack that has already been taken out of the state
because of a timeout (forced active-ack).
+ // The result is ignored because a timeout has already been reported
to the invokerPool per the force.
+ logging.debug(this, s"received completion ack for '$aid' which has no
entry")(tid)
+ case None =>
+ // The entry has already been removed by an active ack. This part of
the code is reached by the timeout and can
+ // happen if active-ack and timeout happen roughly at the same time
(the timeout was triggered before the active
+ // ack canceled the timer). As the active ack is already processed we
don't have to do anything here.
+ logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
+ }
+ }
+}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
new file mode 100644
index 0000000..130203a
--- /dev/null
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.loadBalancer
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
+import org.apache.openwhisk.core.entity.ControllerInstanceId
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.invoker.InvokerReactive
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig._
+import org.apache.openwhisk.core.entity.size._
+
+import scala.concurrent.Future
+
+/**
+ * Lean loadbalancer implemetation.
+ *
+ * Communicates with Invoker directly without Kafka in the middle. Invoker
does not exist as a separate entity, it is built together with Controller
+ * Uses LeanMessagingProvider to use in-memory queue instead of Kafka
+ */
+class LeanBalancer(config: WhiskConfig,
+ feedFactory: FeedFactory,
+ controllerInstance: ControllerInstanceId,
+ implicit val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer)
+ extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
+
+ /** Loadbalancer interface methods */
+ override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] =
Future.successful(IndexedSeq.empty[InvokerHealth])
+ override def clusterSize: Int = 1
+
+ val poolConfig: ContainerPoolConfig =
loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
+
+ val invokerName = InvokerInstanceId(0, None, None, poolConfig.userMemory)
+
+ /** 1. Publish a message to the loadbalancer */
+ override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
+
+ /** 2. Update local state with the activation to be executed scheduled. */
+ val activationResult = setupActivation(msg, action, invokerName)
+ sendActivationToInvoker(messageProducer, msg, invokerName).map(_ =>
activationResult)
+ }
+
+ /** Creates an invoker for executing user actions. There is only one invoker
in the lean model. */
+ private def makeALocalThreadedInvoker() {
+ implicit val ec =
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+ val actorSystema: ActorSystem =
+ ActorSystem(name = "invoker-actor-system", defaultExecutionContext =
Some(ec))
+ new InvokerReactive(config, invokerName, messageProducer)(actorSystema,
implicitly)
+ }
+
+ makeALocalThreadedInvoker()
+
+ override protected val invokerPool: ActorRef =
actorSystem.actorOf(Props.empty)
+
+ override protected def releaseInvoker(invoker: InvokerInstanceId, entry:
ActivationEntry) = {
+ // Currently do nothing
+ }
+
+ override protected def emitHistogramMetric() = {
+ super.emitHistogramMetric()
+ }
+}
+
+object LeanBalancer extends LoadBalancerProvider {
+
+ override def instance(whiskConfig: WhiskConfig, instance:
ControllerInstanceId)(
+ implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): LoadBalancer = {
+
+ new LeanBalancer(whiskConfig, createFeedFactory(whiskConfig, instance),
instance)
+ }
+
+ def requiredProperties =
+ Map(runtimesRegistry -> "") ++
+ ExecManifest.requiredProperties ++
+ wskApiHost
+}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index da1e1b4..041ce0c 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -18,13 +18,14 @@
package org.apache.openwhisk.core.loadBalancer
import scala.concurrent.Future
-import akka.actor.ActorSystem
+import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi
+import scala.concurrent.duration._
/**
* Describes an abstract invoker. An invoker is a local container pool manager
that
@@ -85,6 +86,29 @@ trait LoadBalancerProvider extends Spi {
def instance(whiskConfig: WhiskConfig, instance:
ControllerInstanceId)(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer
+
+ /** Return default FeedFactory */
+ def createFeedFactory(whiskConfig: WhiskConfig, instance:
ControllerInstanceId)(implicit actorSystem: ActorSystem,
+
logging: Logging): FeedFactory = {
+
+ val activeAckTopic = s"completed${instance.asString}"
+ val maxActiveAcksPerPoll = 128
+ val activeAckPollDuration = 1.second
+
+ new FeedFactory {
+ def createFeed(f: ActorRefFactory, provider: MessagingProvider, acker:
Array[Byte] => Future[Unit]) = {
+ f.actorOf(Props {
+ new MessageFeed(
+ "activeack",
+ logging,
+ provider.getConsumer(whiskConfig, activeAckTopic, activeAckTopic,
maxPeek = maxActiveAcksPerPoll),
+ maxActiveAcksPerPoll,
+ activeAckPollDuration,
+ acker)
+ })
+ }
+ }
+ }
}
/** Exception thrown by the loadbalancer */
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 914b3ac..0464393 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -19,34 +19,27 @@ package org.apache.openwhisk.core.loadBalancer
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
-import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
-import java.util.concurrent.atomic.LongAdder
import akka.actor.{Actor, ActorSystem, Cancellable, Props}
import akka.cluster.ClusterEvent._
import akka.cluster.{Cluster, Member, MemberStatus}
-import akka.event.Logging.InfoLevel
import akka.management.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
import pureconfig._
-import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline,
Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
+import scala.concurrent.Future
/**
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -151,17 +144,13 @@ import scala.util.{Failure, Success}
class ShardingContainerPoolBalancer(
config: WhiskConfig,
controllerInstance: ControllerInstanceId,
- private val feedFactory: FeedFactory,
- private val invokerPoolFactory: InvokerPoolFactory,
- lbConfig: ShardingContainerPoolBalancerConfig =
-
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
- private val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
- implicit val actorSystem: ActorSystem,
+ feedFactory: FeedFactory,
+ val invokerPoolFactory: InvokerPoolFactory,
+ implicit val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
- extends LoadBalancer {
-
- private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+ extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
/** Build a cluster of all loadbalancers */
private val cluster: Option[Cluster] = if
(loadConfigOrThrow[ClusterConfig](ConfigKeys.cluster).useClusterBootstrap) {
@@ -174,45 +163,26 @@ class ShardingContainerPoolBalancer(
None
}
- /** State related to invocations and throttling */
- protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
ActivationEntry]()
- protected[loadBalancer] val activationPromises =
- TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
- private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
- private val totalActivations = new LongAdder()
- private val totalBlackBoxActivationMemory = new LongAdder()
- private val totalManagedActivationMemory = new LongAdder()
-
- /** State needed for scheduling. */
- protected[loadBalancer] val schedulingState =
ShardingContainerPoolBalancerState()(lbConfig)
-
- actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
-
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
totalActivations.longValue)
- MetricEmitter.emitHistogramMetric(
- LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
- totalBlackBoxActivationMemory.longValue +
totalManagedActivationMemory.longValue)
+ override protected def emitHistogramMetric() = {
+ super.emitHistogramMetric()
MetricEmitter.emitHistogramMetric(
- LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
- totalBlackBoxActivationMemory.longValue)
- MetricEmitter.emitHistogramMetric(
- LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
- totalManagedActivationMemory.longValue)
- MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_BLACKBOX,
schedulingState.blackboxInvokers.foldLeft(0L) {
- (total, curr) =>
+ INVOKER_TOTALMEM_BLACKBOX,
+ schedulingState.blackboxInvokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) {
curr.id.userMemory.toMB + total
} else {
total
}
- })
- MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_MANAGED,
schedulingState.managedInvokers.foldLeft(0L) {
- (total, curr) =>
+ })
+ MetricEmitter.emitHistogramMetric(
+ INVOKER_TOTALMEM_MANAGED,
+ schedulingState.managedInvokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) {
curr.id.userMemory.toMB + total
} else {
total
}
- })
+ })
MetricEmitter.emitHistogramMetric(
HEALTHY_INVOKER_MANAGED,
schedulingState.managedInvokers.count(_.status == Healthy))
@@ -239,6 +209,9 @@ class ShardingContainerPoolBalancer(
schedulingState.blackboxInvokers.count(_.status == Offline))
}
+ /** State needed for scheduling. */
+ val schedulingState = ShardingContainerPoolBalancerState()(lbConfig)
+
/**
* Monitors invoker supervision and the cluster to update the state
sequentially
*
@@ -283,9 +256,6 @@ class ShardingContainerPoolBalancer(
/** Loadbalancer interface methods */
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] =
Future.successful(schedulingState.invokers)
- override def activeActivationsFor(namespace: UUID): Future[Int] =
-
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
- override def totalActiveActivations: Future[Int] =
Future.successful(totalActivations.intValue())
override def clusterSize: Int = schedulingState.clusterSize
/** 1. Publish a message to the loadbalancer */
@@ -344,201 +314,19 @@ class ShardingContainerPoolBalancer(
}
}
- /**
- * 2. Update local state with the to be executed activation.
- *
- * All activations are tracked in the activationSlots map. Additionally,
blocking invokes
- * are tracked in the activation results map. When a result is received via
activeack, it
- * will cause the result to be forwarded to the caller waiting on the
result, and cancel
- * the DB poll which is also trying to do the same.
- */
- private def setupActivation(msg: ActivationMessage,
- action: ExecutableWhiskActionMetaData,
- instance: InvokerInstanceId):
Future[Either[ActivationId, WhiskActivation]] = {
-
- totalActivations.increment()
- val isBlackboxInvocation = action.exec.pull
- val totalActivationMemory =
- if (isBlackboxInvocation) totalBlackBoxActivationMemory else
totalManagedActivationMemory
- totalActivationMemory.add(action.limits.memory.megabytes)
-
- activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
-
- // Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
- // value for action durations to avoid too tight timeouts.
- // Timeouts in general are diluted by a configurable factor. In essence
this factor controls how much slack you want
- // to allow in your topics before you start reporting failed activations.
- val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION)
* lbConfig.timeoutFactor) + 1.minute
-
- val resultPromise = if (msg.blocking) {
- activationPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
- } else Future.successful(Left(msg.activationId))
-
- // Install a timeout handler for the catastrophic case where an active ack
is not received at all
- // (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
- // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
- // in this case, if the activation handler is still registered, remove it
and update the books.
- activationSlots.getOrElseUpdate(
- msg.activationId, {
- val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
- processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false, invoker = instance)
- }
-
- // please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
- ActivationEntry(
- msg.activationId,
- msg.user.namespace.uuid,
- instance,
- action.limits.memory.megabytes.MB,
- action.limits.concurrency.maxConcurrent,
- action.fullyQualifiedName(true),
- timeoutHandler,
- isBlackboxInvocation)
- })
-
- resultPromise
- }
-
- private val messageProducer = messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
-
- /** 3. Send the activation to the invoker */
- private def sendActivationToInvoker(producer: MessageProducer,
- msg: ActivationMessage,
- invoker: InvokerInstanceId):
Future[RecordMetadata] = {
- implicit val transid: TransactionId = msg.transid
-
- val topic = s"invoker${invoker.toInt}"
-
-
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
- val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA)
-
- producer.send(topic, msg).andThen {
- case Success(status) =>
- transid.finished(
- this,
- start,
- s"posted to
${status.topic()}[${status.partition()}][${status.offset()}]",
- logLevel = InfoLevel)
- case Failure(_) => transid.failed(this, start, s"error on posting to
topic $topic")
- }
- }
-
- /**
- * Subscribes to active acks (completion messages from the invokers), and
- * registers a handler for received active acks from invokers.
- */
- private val activationFeed: ActorRef =
- feedFactory.createFeed(actorSystem, messagingProvider,
processAcknowledgement)
-
- /** 4. Get the active-ack message and parse it */
- protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
- val raw = new String(bytes, StandardCharsets.UTF_8)
- AcknowledegmentMessage.parse(raw) match {
- case Success(m: CompletionMessage) =>
- processCompletion(
- m.activationId,
- m.transid,
- forced = false,
- isSystemError = m.isSystemError,
- invoker = m.invoker)
- activationFeed ! MessageFeed.Processed
-
- case Success(m: ResultMessage) =>
- processResult(m.response, m.transid)
- activationFeed ! MessageFeed.Processed
-
- case Failure(t) =>
- activationFeed ! MessageFeed.Processed
- logging.error(this, s"failed processing message: $raw")
-
- case _ =>
- activationFeed ! MessageFeed.Processed
- logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
- }
- }
-
- /** 5. Process the result ack and return it to the user */
- private def processResult(response: Either[ActivationId, WhiskActivation],
tid: TransactionId): Unit = {
- val aid = response.fold(l => l, r => r.activationId)
-
- // Resolve the promise to send the result back to the user.
- // The activation will be removed from the activation slots later, when
the completion message
- // is received (because the slot in the invoker is not yet free for new
activations).
- activationPromises.remove(aid).foreach(_.trySuccess(response))
- logging.info(this, s"received result ack for '$aid'")(tid)
- }
-
- /** Process the completion ack and update the state */
- protected[loadBalancer] def processCompletion(aid: ActivationId,
- tid: TransactionId,
- forced: Boolean,
- isSystemError: Boolean,
- invoker: InvokerInstanceId):
Unit = {
-
- val invocationResult = if (forced) {
- InvocationFinishedResult.Timeout
- } else {
- // If the response contains a system error, report that, otherwise
report Success
- // Left generally is considered a Success, since that could be a message
not fitting into Kafka
- if (isSystemError) {
- InvocationFinishedResult.SystemError
- } else {
- InvocationFinishedResult.Success
- }
- }
-
- activationSlots.remove(aid) match {
- case Some(entry) =>
- totalActivations.decrement()
- val totalActivationMemory =
- if (entry.isBlackbox) totalBlackBoxActivationMemory else
totalManagedActivationMemory
- totalActivationMemory.add(entry.memory.toMB * (-1))
- activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
- schedulingState.invokerSlots
- .lift(invoker.toInt)
- .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName,
entry.maxConcurrent, entry.memory.toMB.toInt))
- if (!forced) {
- entry.timeoutHandler.cancel()
- // notice here that the activationPromises is not touched, because
the expectation is that
- // the active ack is received as expected, and processing that
message removed the promise
- // from the corresponding map
- } else {
- // the entry has timed out; if the active ack is still around,
remove its entry also
- // and complete the promise with a failure if necessary
- activationPromises
- .remove(aid)
- .foreach(_.tryFailure(new Throwable("no completion or active ack
received yet")))
- }
-
- logging.info(this, s"${if (!forced) "received" else "forced"}
completion ack for '$aid'")(tid)
- // Active acks that are received here are strictly from user actions -
health actions are not part of
- // the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
- invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
- case None if tid == TransactionId.invokerHealth =>
- // Health actions do not have an ActivationEntry as they are written
on the message bus directly. Their result
- // is important to pass to the invokerPool because they are used to
determine if the invoker can be considered
- // healthy again.
- logging.info(this, s"received completion ack for health action on
$invoker")(tid)
- invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
- case None if !forced =>
- // Received an active-ack that has already been taken out of the state
because of a timeout (forced active-ack).
- // The result is ignored because a timeout has already been reported
to the invokerPool per the force.
- logging.debug(this, s"received completion ack for '$aid' which has no
entry")(tid)
- case None =>
- // The entry has already been removed by an active ack. This part of
the code is reached by the timeout and can
- // happen if active-ack and timeout happen roughly at the same time
(the timeout was triggered before the active
- // ack canceled the timer). As the active ack is already processed we
don't have to do anything here.
- logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
- }
- }
-
- private val invokerPool =
+ override val invokerPool =
invokerPoolFactory.createInvokerPool(
actorSystem,
messagingProvider,
messageProducer,
sendActivationToInvoker,
Some(monitor))
+
+ override protected def releaseInvoker(invoker: InvokerInstanceId, entry:
ActivationEntry) = {
+ schedulingState.invokerSlots
+ .lift(invoker.toInt)
+ .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName,
entry.maxConcurrent, entry.memory.toMB.toInt))
+ }
}
object ShardingContainerPoolBalancer extends LoadBalancerProvider {
@@ -548,24 +336,6 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = {
- val activeAckTopic = s"completed${instance.asString}"
- val maxActiveAcksPerPoll = 128
- val activeAckPollDuration = 1.second
-
- val feedFactory = new FeedFactory {
- def createFeed(f: ActorRefFactory, provider: MessagingProvider, acker:
Array[Byte] => Future[Unit]) = {
- f.actorOf(Props {
- new MessageFeed(
- "activeack",
- logging,
- provider.getConsumer(whiskConfig, activeAckTopic, activeAckTopic,
maxPeek = maxActiveAcksPerPoll),
- maxActiveAcksPerPoll,
- activeAckPollDuration,
- acker)
- })
- }
- }
-
val invokerPoolFactory = new InvokerPoolFactory {
override def createInvokerPool(
actorRefFactory: ActorRefFactory,
@@ -585,7 +355,11 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
}
}
- new ShardingContainerPoolBalancer(whiskConfig, instance, feedFactory,
invokerPoolFactory)
+ new ShardingContainerPoolBalancer(
+ whiskConfig,
+ instance,
+ createFeedFactory(whiskConfig, instance),
+ invokerPoolFactory)
}
def requiredProperties: Map[String, String] = kafkaHosts
@@ -613,7 +387,6 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
*
* @param maxConcurrent concurrency limit supported by this action
* @param invokers a list of available invokers to search in, including
their state
- * @param concurrentSlots optional map of invoker -> semaphore to track
concurrency slots for this action
* @param dispatched semaphores for each invoker to give the slots away from
* @param slots Number of slots, that need to be acquired (e.g. memory in MB)
* @param index the index to start from (initially should be the
"homeInvoker"
diff --git a/tests/build.gradle b/tests/build.gradle
index e427d72..5fda213 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -51,6 +51,20 @@ def leanExcludes = [
'**/MaxActionDurationTests*',
]
+def systemIncludes = [
+ "org/apache/openwhisk/core/limits/**",
+ "org/apache/openwhisk/core/admin/**",
+ "org/apache/openwhisk/core/cli/test/**",
+ "org/apache/openwhisk/core/apigw/actions/test/**",
+ "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
+ "org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
+ "apigw/healthtests/**",
+ "ha/**",
+ "services/**",
+ "system/basic/**",
+ "system/rest/**",
+]
+
ext.testSets = [
"REQUIRE_ONLY_DB" : [
"includes" : [
@@ -67,19 +81,7 @@ ext.testSets = [
]
],
"REQUIRE_SYSTEM" : [
- "includes" : [
- "org/apache/openwhisk/core/limits/**",
- "org/apache/openwhisk/core/admin/**",
- "org/apache/openwhisk/core/cli/test/**",
- "org/apache/openwhisk/core/apigw/actions/test/**",
- "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
- "org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
- "apigw/healthtests/**",
- "ha/**",
- "services/**",
- "system/basic/**",
- "system/rest/**",
- ],
+ "includes" : systemIncludes,
"excludes": [
"system/basic/WskMultiRuntimeTests*"
]
@@ -98,6 +100,15 @@ ext.testSets = [
"includes" : [
"system/basic/**"
]
+ ],
+ "REQUIRE_LEAN_SYSTEM" : [
+ "includes" : systemIncludes,
+
+ // Tests suits below require Kafka so they are excluded for Lean
System tests
+ "excludes" : [
+ "**/*KafkaConnectorTests*",
+ "system/basic/WskMultiRuntimeTests*"
+ ]
]
]
diff --git a/tests/performance/preparation/deploy-lean.sh
b/tests/performance/preparation/deploy-lean.sh
new file mode 100755
index 0000000..361003b
--- /dev/null
+++ b/tests/performance/preparation/deploy-lean.sh
@@ -0,0 +1,38 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -e
+SCRIPTDIR="$(cd "$(dirname "$0")"; pwd)"
+ROOTDIR="$SCRIPTDIR/../../.."
+
+# Build Openwhisk
+cd $ROOTDIR
+TERM=dumb ./gradlew distDocker -PdockerImagePrefix=testing $GRADLE_PROJS_SKIP
+
+# Deploy Openwhisk
+cd $ROOTDIR/ansible
+ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e
limit_invocations_concurrent=999999 -e controller_client_auth=false -e
userLogs_spi=\"org.apache.openwhisk.core.containerpool.logging.LogDriverLogStoreProvider\""
+
+$ANSIBLE_CMD setup.yml
+
+$ANSIBLE_CMD prereq.yml
+$ANSIBLE_CMD couchdb.yml
+$ANSIBLE_CMD initdb.yml
+$ANSIBLE_CMD wipe.yml
+
+$ANSIBLE_CMD controller.yml -e lean=true
+$ANSIBLE_CMD edge.yml
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index cda8dfd..42ddbcf 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -476,13 +476,7 @@ class ShardingContainerPoolBalancerTests
TestProbe().testActor
}
val balancer =
- new ShardingContainerPoolBalancer(
- config,
- ControllerInstanceId("0"),
- feedProbe,
- invokerPoolProbe,
- lbConfig(0.0),
- mockMessaging)
+ new ShardingContainerPoolBalancer(config, ControllerInstanceId("0"),
feedProbe, invokerPoolProbe, mockMessaging)
val invokers = IndexedSeq.tabulate(numInvokers) { i =>
new InvokerHealth(InvokerInstanceId(i, userMemory = invokerMem), Healthy)
diff --git a/tools/travis/runLeanSystemTests.sh
b/tools/travis/runLeanSystemTests.sh
new file mode 100755
index 0000000..372fbc1
--- /dev/null
+++ b/tools/travis/runLeanSystemTests.sh
@@ -0,0 +1,36 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+SCRIPTDIR=$(cd $(dirname "$0") && pwd)
+ROOTDIR="$SCRIPTDIR/../.."
+
+cd $ROOTDIR/tools/travis
+
+export ORG_GRADLE_PROJECT_testSetName="REQUIRE_LEAN_SYSTEM"
+export GRADLE_COVERAGE=true
+
+./setupPrereq.sh /ansible/files/runtimes-nodeonly.json
+
+./distDocker-lean.sh
+
+./setupLeanSystem.sh /ansible/files/runtimes-nodeonly.json
+
+./runTests.sh
diff --git a/tools/travis/setupLeanSystem.sh b/tools/travis/setupLeanSystem.sh
new file mode 100755
index 0000000..89568b2
--- /dev/null
+++ b/tools/travis/setupLeanSystem.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+# Build script for Travis-CI.
+SECONDS=0
+SCRIPTDIR=$(cd $(dirname "$0") && pwd)
+ROOTDIR="$SCRIPTDIR/../.."
+RUNTIMES_MANIFEST=${1:-"/ansible/files/runtimes.json"}
+
+cd $ROOTDIR/ansible
+
+$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST" -e lean=true
+$ANSIBLE_CMD apigateway.yml
+$ANSIBLE_CMD routemgmt.yml
+
+echo "Time taken for ${0##*/} is $SECONDS secs"
diff --git a/tools/vagrant/README.md b/tools/vagrant/README.md
index fcec706..0697c4b 100644
--- a/tools/vagrant/README.md
+++ b/tools/vagrant/README.md
@@ -309,3 +309,12 @@ Ignore error message `Sub-process /usr/bin/dpkg returned
an error code (1)` when
creating Vagrant VM using `gui-true`. Remember to use `gui=true` every time you
do `vagrant reload`. Or, you can enable the GUI directly by editing the Vagrant
file.
+
+## Lean Setup
+To have a lean setup (no Kafka, Zookeeper and no Invokers as separate entities)
+
+Set environment variable LEAN to true before creating vagrant VM
+```
+export LEAN=true
+```
+
diff --git a/tools/vagrant/Vagrantfile b/tools/vagrant/Vagrantfile
index 2466e35..ced5983 100644
--- a/tools/vagrant/Vagrantfile
+++ b/tools/vagrant/Vagrantfile
@@ -149,7 +149,16 @@ Vagrant.configure('2') do |config|
echo "`date`: deploy-start" >> /tmp/vagrant-times.txt
cd ${ANSIBLE_HOME}
su vagrant -c 'ansible-playbook -i environments/vagrant wipe.yml'
- su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e
invoker_use_runc=False'
+
+ export LEAN=#{ENV['LEAN']} || "false"
+ if [[ $LEAN == "true" ]]; then
+ # Deploy Lean Openwhisk (consolidated controller + invoker without
kafka, zookeeper etc.)
+ su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e
invoker_use_runc=False -e controller_akka_provider=local -e lean=true'
+ else
+ # Deploy full Openwhisk stack
+ su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e
invoker_use_runc=False'
+ fi
+
su vagrant -c 'ansible-playbook -i environments/vagrant postdeploy.yml'
su vagrant -c 'ansible-playbook -i environments/vagrant apigateway.yml'
su vagrant -c 'ansible-playbook -i environments/vagrant routemgmt.yml'