This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new fbe85b1  Adds a leaner OpenWhisk configuration. (#4216)
fbe85b1 is described below

commit fbe85b1a06e4cabbe47b487eabbce95e3d1d5f3d
Author: Pavel Kravchenko <[email protected]>
AuthorDate: Sat Feb 9 16:12:22 2019 +0200

    Adds a leaner OpenWhisk configuration. (#4216)
    
    This commit introduces a leaner configuration of the deployment that 
eschews kafka & zookeeper. It merges the controller and invoker together and 
uses an in-memory bus to communicate between the controller and invoker. The 
leaner configuration reduces the amount of memory needed by a deploymen.
---
 ansible/README.md                                  |  11 +
 ansible/group_vars/all                             |   1 +
 ansible/openwhisk.yml                              |   3 +
 ansible/roles/controller/tasks/deploy.yml          |   9 +
 ansible/roles/controller/tasks/lean.yml            |  40 +++
 ansible/roles/invoker/tasks/deploy.yml             |  48 ++--
 .../openwhisk/connector/lean/LeanConsumer.scala    |  49 ++++
 .../connector/lean/LeanMessagingProvider.scala     |  69 +++++
 .../openwhisk/connector/lean/LeanProducer.scala    |  60 +++++
 core/controller/Dockerfile                         |  23 +-
 core/controller/build.gradle                       |   1 +
 .../core/loadBalancer/CommonLoadBalancer.scala     | 282 ++++++++++++++++++++
 .../openwhisk/core/loadBalancer/LeanBalancer.scala | 104 ++++++++
 .../openwhisk/core/loadBalancer/LoadBalancer.scala |  26 +-
 .../ShardingContainerPoolBalancer.scala            | 289 +++------------------
 tests/build.gradle                                 |  37 ++-
 tests/performance/preparation/deploy-lean.sh       |  38 +++
 .../test/ShardingContainerPoolBalancerTests.scala  |   8 +-
 tools/travis/runLeanSystemTests.sh                 |  36 +++
 tools/travis/setupLeanSystem.sh                    |  33 +++
 tools/vagrant/README.md                            |   9 +
 tools/vagrant/Vagrantfile                          |  11 +-
 22 files changed, 891 insertions(+), 296 deletions(-)

diff --git a/ansible/README.md b/ansible/README.md
index eeca95f..13efbe9 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -280,6 +280,17 @@ This is usually not necessary, however in case you want to 
uninstall all prereqs
 ansible-playbook -i environments/<environment> prereq.yml -e mode=clean
 ```
 
+### Lean Setup
+To have a lean setup (no Kafka, Zookeeper and no Invokers as separate 
entities):
+
+At [Deploying Using CouchDB](ansible/README.md#deploying-using-cloudant) step, 
replace:
+```
+ansible-playbook -i environments/<environment> openwhisk.yml
+```
+by:
+```
+ansible-playbook -i environments/<environment> openwhisk.yml -e lean=true
+```
 
 ### Troubleshooting
 Some of the more common problems and their solution are listed here.
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 3e4b3ea..bdb9a70 100755
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -2,6 +2,7 @@
 # license agreements; and to You under the Apache License, Version 2.0.
 
 mode: deploy
+lean: false
 prompt_user: true
 openwhisk_home: "{{ lookup('env', 'OPENWHISK_HOME') | default(playbook_dir ~ 
'/..', true) }}"
 openwhisk_cli_home: "{{ lookup('env', 'OPENWHISK_CLI') | 
default(openwhisk_home ~ '/../incubator-openwhisk-cli', true) }}"
diff --git a/ansible/openwhisk.yml b/ansible/openwhisk.yml
index 11111e1..5f6583b 100644
--- a/ansible/openwhisk.yml
+++ b/ansible/openwhisk.yml
@@ -6,11 +6,14 @@
 # playbook (currently cloudant.yml or couchdb.yml).
 # It assumes that wipe.yml have being deployed at least once.
 
+
 - import_playbook: kafka.yml
+  when: not lean
 
 - import_playbook: controller.yml
 
 - import_playbook: invoker.yml
+  when: not lean
 
 - import_playbook: edge.yml
 
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index e13b6f9..7594e28 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -283,6 +283,11 @@
 - name: include plugins
   include_tasks: "{{ item }}.yml"
   with_items: "{{ controller_plugins | default([]) }}"
+  when: not lean
+
+- name: lean controller setup
+  include_tasks: "lean.yml"
+  when: lean
 
 - name: (re)start controller
   docker_container:
@@ -296,6 +301,10 @@
     env: "{{ env }}"
     volumes: "{{ controller_volumes }}"
     ports: "{{ ports_to_expose }}"
+    # userns_mode, pid_mode and privileged required when controller running in 
lean mode
+    userns_mode: "{{ userns_mode | default('') }}"
+    pid_mode: "{{ pid_mode | default('') }}"
+    privileged: "{{ privileged | default('no') }}"
     command:
       /bin/sh -c
       "exec /init.sh {{ controller_index }}
diff --git a/ansible/roles/controller/tasks/lean.yml 
b/ansible/roles/controller/tasks/lean.yml
new file mode 100644
index 0000000..809f13d
--- /dev/null
+++ b/ansible/roles/controller/tasks/lean.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+# license agreements; and to You under the Apache License, Version 2.0.
+---
+# This plugin will  provide controller with Lean Controller parameters
+
+- name: set inventory_hostname to invoker and save controllers data that can 
be changed by invoker task
+  set_fact:
+    controller_env: "{{ env }}"
+    inventory_hostname: "invoker0"
+    invoker_index_base: 0
+    name_prefix: "invoker"
+    host_group: "{{ groups['invokers'] }}"
+
+- name: include invoker data
+  include_tasks: "../invoker/tasks/deploy.yml"
+
+- name: save invoker volumes
+  set_fact:
+    invoker_volumes: "{{ volumes.split(',') | reject('search','/logs') | 
reject('search','/conf') | reject('search','/coverage') | list }}"
+
+- name: populate volumes
+  set_fact:
+    controller_volumes: >-
+      {{ invoker_volumes }} +
+      {{ controller_volumes }}
+
+- name: populate environment variables for LEAN controller
+  vars:
+    lean_env:
+      "CONFIG_whisk_spi_MessagingProvider": 
"org.apache.openwhisk.connector.lean.LeanMessagingProvider"
+      "CONFIG_whisk_spi_LoadBalancerProvider": 
"org.apache.openwhisk.core.loadBalancer.LeanBalancer"
+  set_fact:
+    env: "{{ env | combine(controller_env) | combine(lean_env) }}"
+
+- name: provide extended docker container params for controller
+  set_fact:
+    userns_mode: "host"
+    pid_mode: "host"
+    privileged: "yes"
+
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 9e37b03..2339110 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -26,25 +26,29 @@
 # must include a trailing '/'.
 #
 - name: "pull runtime action images per manifest"
-  shell: "docker pull {{runtimes_registry | 
default()}}{{item.prefix}}/{{item.name}}:{{item.tag | default()}}"
+  shell: "docker pull {{runtimes_registry | 
default()}}{{inv_item.prefix}}/{{inv_item.name}}:{{inv_item.tag | default()}}"
   loop: "{{ runtimesManifest.runtimes.values() | sum(start=[]) | 
selectattr('deprecated', 'equalto',false)  | map(attribute='image') | list | 
unique }}"
   when: skip_pull_runtimes is not defined or not (skip_pull_runtimes == True 
or skip_pull_runtimes.lower() == "true")
   register: result
   until: (result.rc == 0)
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
+  loop_control:
+    loop_var: inv_item
 
 ###
 # See comment above for pulling other runtime images.
 #
 - name: "pull blackboxes action images per manifest"
-  shell: "docker pull {{runtimes_registry | 
default()}}{{item.prefix}}/{{item.name}}:{{item.tag | default()}}"
+  shell: "docker pull {{runtimes_registry | 
default()}}{{inv_item.prefix}}/{{inv_item.name}}:{{inv_item.tag | default()}}"
   loop: "{{ runtimesManifest.blackboxes }}"
   when: skip_pull_runtimes is not defined or not (skip_pull_runtimes == True 
or skip_pull_runtimes.lower() == "true")
   register: result
   until: (result.rc == 0)
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
+  loop_control:
+    loop_var: inv_item
 
 - name: "determine docker root dir on docker-machine"
   uri:  url="http://{{ ansible_host }}:{{ docker.port }}/info" 
return_content=yes
@@ -89,24 +93,28 @@
 - name: copy keystore, key and cert
   when: invoker.protocol == "https"
   copy:
-    src: "{{ item }}"
+    src: "{{ inv_item }}"
     mode: 0666
     dest: "{{ invoker.confdir }}/{{ invoker_name }}"
   become: "{{ invoker.dir.become }}"
   with_items:
-  - "files/{{ invoker.ssl.keystore.name }}"
-  - "files/{{ invoker.ssl.key }}"
-  - "files/{{ invoker.ssl.cert }}"
+  - "{{ openwhisk_home }}/ansible/roles/invoker/files/{{ 
invoker.ssl.keystore.name }}"
+  - "{{ openwhisk_home }}/ansible/roles/invoker/files/{{ invoker.ssl.key }}"
+  - "{{ openwhisk_home }}/ansible/roles/invoker/files/{{ invoker.ssl.cert }}"
+  loop_control:
+    loop_var: inv_item
 
 - name: check, that required databases exist
   include_tasks: "{{ openwhisk_home }}/ansible/tasks/db/checkDb.yml"
   vars:
-    dbName: "{{ item }}"
+    dbName: "{{ inv_item }}"
     dbUser: "{{ db.credentials.invoker.user }}"
     dbPass: "{{ db.credentials.invoker.pass }}"
   with_items:
   - "{{ db.whisk.actions }}"
   - "{{ db.whisk.activations }}"
+  loop_control:
+    loop_var: inv_item
 
 - name: get running invoker information
   uri: url="http://{{ ansible_host }}:{{ docker.port 
}}/containers/json?filters={{ '{"name":[ "invoker" ],"ancestor":[ "invoker" ]}' 
| urlencode }}" return_content=yes
@@ -141,9 +149,11 @@
 
 - name: determine if index of invoker is same with index of inventory host
   fail:
-    msg: "invoker index is invalid. expected: /invoker{{ 
groups['invokers'].index(inventory_hostname) }} found: {{ item.Names[0] }}"
+    msg: "invoker index is invalid. expected: /invoker{{ 
groups['invokers'].index(inventory_hostname) }} found: {{ inv_item.Names[0] }}"
   with_items: "{{ invokerInfo }}"
-  when: not invoker.allowMultipleInstances and item.Names[0] != "/{{ 
invoker_name }}"
+  when: not invoker.allowMultipleInstances and inv_item.Names[0] != "/{{ 
invoker_name }}"
+  loop_control:
+    loop_var: inv_item
 
 - name: copy jmxremote password file
   when: jmx.enabled
@@ -166,12 +176,12 @@
 
 - name: prepare invoker ports
   set_fact:
-    ports_to_expose: ["{{ invoker.port + (invoker_index | int) }}:8080"]
+    invoker_ports_to_expose: ["{{ invoker.port + (invoker_index | int) 
}}:8080"]
 
 - name: expose additional ports if jmxremote is enabled
   when: jmx.enabled
   set_fact:
-    ports_to_expose: "{{ ports_to_expose }} + [ \"{{ jmx.basePortInvoker + 
(invoker_index | int) }}:{{ jmx.basePortInvoker + (invoker_index | int) }}\" ] 
+ [ \"{{ jmx.rmiBasePortInvoker + (invoker_index | int) }}:{{ 
jmx.rmiBasePortInvoker + (invoker_index | int) }}\" ]"
+    invoker_ports_to_expose: "{{ invoker_ports_to_expose }} + [ \"{{ 
jmx.basePortInvoker + (invoker_index | int) }}:{{ jmx.basePortInvoker + 
(invoker_index | int) }}\" ] + [ \"{{ jmx.rmiBasePortInvoker + (invoker_index | 
int) }}:{{ jmx.rmiBasePortInvoker + (invoker_index | int) }}\" ]"
 
 - name: populate environment variables for invoker
   set_fact:
@@ -249,16 +259,20 @@
 
 - name: extend invoker dns env
   set_fact:
-    env: "{{ env | default({}) | combine( 
{'CONFIG_whisk_containerFactory_containerArgs_dnsServers_' ~ item.0: item.1} ) 
}}"
+    env: "{{ env | default({}) | combine( 
{'CONFIG_whisk_containerFactory_containerArgs_dnsServers_' ~ inv_item.0: 
inv_item.1} ) }}"
   with_indexed_items: "{{ (invoker_container_network_dns_servers | 
default()).split(' ')}}"
+  loop_control:
+    loop_var: inv_item
 
 - name: merge extra env variables
   set_fact:
     env: "{{ env | combine(invoker.extraEnv) }}"
 
 - name: include plugins
-  include_tasks: "{{ item }}.yml"
+  include_tasks: "{{ inv_item }}.yml"
   with_items: "{{ invoker_plugins | default([]) }}"
+  loop_control:
+    loop_var: inv_item
 
 - name: set invoker volumes
   set_fact:
@@ -285,7 +299,7 @@
 
 - name: ensure invoker coverage directory is created with permissions
   file:
-    path: "{{ coverage_logs_dir }}/invoker/{{ item }}"
+    path: "{{ coverage_logs_dir }}/invoker/{{ inv_item }}"
     state: directory
     mode: 0777
   with_items:
@@ -293,6 +307,8 @@
     - common
   become: "{{ logs.dir.become }}"
   when: coverage_enabled
+  loop_control:
+    loop_var: inv_item
 
 - name: extend invoker volume for coverage
   set_fact:
@@ -312,8 +328,9 @@
     recreate: true
     env: "{{ env }}"
     volumes: "{{ volumes }}"
-    ports: "{{ ports_to_expose }}"
+    ports: "{{ invoker_ports_to_expose }}"
     command: /bin/sh -c "exec /init.sh --id {{ invoker_index }} --uniqueName 
{{ invoker_index }} >> /logs/{{ invoker_name }}_logs.log 2>&1"
+  when: not lean
 
 - name: wait until Invoker is up and running
   uri:
@@ -325,3 +342,4 @@
   until: result.status == 200
   retries: 12
   delay: 5
+  when: not lean
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala
new file mode 100644
index 0000000..98b1ace
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.lean
+
+import scala.concurrent.duration._
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.MessageConsumer
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.TimeUnit
+
+class LeanConsumer(queue: BlockingQueue[Array[Byte]], override val maxPeek: 
Int)(implicit logging: Logging)
+    extends MessageConsumer {
+
+  /**
+   * Long poll for messages. Method returns once message available but no 
later than given
+   * duration.
+   *
+   * @param duration the maximum duration for the long poll
+   */
+  override def peek(duration: FiniteDuration, retry: Int): Iterable[(String, 
Int, Long, Array[Byte])] = {
+    Option(queue.poll(duration.toMillis, TimeUnit.MILLISECONDS))
+      .map(record => Iterable(("", 0, 0L, record)))
+      .getOrElse(Iterable.empty)
+  }
+
+  /**
+   * There's no cursor to advance since that's done in the poll above.
+   */
+  override def commit(retry: Int): Unit = { /*do nothing*/ }
+
+  override def close(): Unit = {
+    logging.info(this, s"closing lean consumer")
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala
new file mode 100644
index 0000000..2bcf271
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.lean
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.Map
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Success
+import scala.util.Try
+
+import akka.actor.ActorSystem
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.MessageConsumer
+import org.apache.openwhisk.core.connector.MessageProducer
+import org.apache.openwhisk.core.connector.MessagingProvider
+import org.apache.openwhisk.core.entity.ByteSize
+
+/**
+ * A simple implementation of MessagingProvider.
+ */
+object LeanMessagingProvider extends MessagingProvider {
+
+  /** Map to hold message queues, the key is the topic */
+  val queues: Map[String, BlockingQueue[Array[Byte]]] =
+    new TrieMap[String, BlockingQueue[Array[Byte]]]
+
+  def getConsumer(config: WhiskConfig, groupId: String, topic: String, 
maxPeek: Int, maxPollInterval: FiniteDuration)(
+    implicit logging: Logging,
+    actorSystem: ActorSystem): MessageConsumer = {
+
+    val queue = queues.getOrElseUpdate(topic, new 
LinkedBlockingQueue[Array[Byte]]())
+
+    new LeanConsumer(queue, maxPeek)
+  }
+
+  def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize] = 
None)(
+    implicit logging: Logging,
+    actorSystem: ActorSystem): MessageProducer =
+    new LeanProducer(queues)
+
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String, 
maxMessageBytes: Option[ByteSize] = None)(
+    implicit logging: Logging): Try[Unit] = {
+    if (queues.contains(topic)) {
+      Success(logging.info(this, s"topic $topic already existed"))
+    } else {
+      queues.put(topic, new LinkedBlockingQueue[Array[Byte]]())
+      Success(logging.info(this, s"topic $topic created"))
+    }
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
new file mode 100644
index 0000000..e555e85
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.lean
+
+import akka.actor.ActorSystem
+import scala.concurrent.Future
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
+import org.apache.openwhisk.common.Counter
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.connector.Message
+import org.apache.openwhisk.core.connector.MessageProducer
+
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import scala.collection.mutable.Map
+import java.nio.charset.StandardCharsets
+import scala.concurrent.ExecutionContext
+
+class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit 
logging: Logging, actorSystem: ActorSystem)
+    extends MessageProducer {
+
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+
+  override def sentCount(): Long = sentCounter.cur
+
+  /** Sends msg to topic. This is an asynchronous operation. */
+  override def send(topic: String, msg: Message, retry: Int = 3): 
Future[RecordMetadata] = {
+    implicit val transid = msg.transid
+
+    val queue = queues.getOrElseUpdate(topic, new 
LinkedBlockingQueue[Array[Byte]]())
+
+    Future {
+      queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
+      sentCounter.next()
+      new RecordMetadata(new TopicPartition(topic, 0), -1, -1, 
System.currentTimeMillis(), null, -1, -1)
+    }
+  }
+
+  /** Closes producer. */
+  override def close(): Unit = {
+    logging.info(this, "closing lean producer")
+  }
+
+  private val sentCounter = new Counter()
+}
diff --git a/core/controller/Dockerfile b/core/controller/Dockerfile
index 85950d9..b540124 100644
--- a/core/controller/Dockerfile
+++ b/core/controller/Dockerfile
@@ -8,6 +8,25 @@ ENV UID=1001 \
 ENV 
SWAGGER_UI_DOWNLOAD_SHA256=3d7ef5ddc59e10f132fe99771498f0f1ba7a2cbfb9585f9863d4191a574c96e7
 \
     SWAGGER_UI_VERSION=3.6.0
 
+###################################################################################################
+# It's needed for lean mode where the controller is also an invoker
+###################################################################################################
+ENV DOCKER_VERSION=1.12.0 \
+    
DOCKER_DOWNLOAD_SHA256=3dd07f65ea4a7b4c8829f311ab0213bca9ac551b5b24706f3e79a97e22097f8b
+
+RUN apk add --update openssl
+
+# Uncomment to fetch latest version of docker instead: RUN wget -qO- 
https://get.docker.com | sh
+# Install docker client
+RUN curl -sSL -o docker-${DOCKER_VERSION}.tgz 
https://get.docker.com/builds/Linux/x86_64/docker-${DOCKER_VERSION}.tgz && \
+echo "${DOCKER_DOWNLOAD_SHA256}  docker-${DOCKER_VERSION}.tgz" | sha256sum -c 
- && \
+tar --strip-components 1 -xvzf docker-${DOCKER_VERSION}.tgz -C /usr/bin 
docker/docker && \
+tar --strip-components 1 -xvzf docker-${DOCKER_VERSION}.tgz -C /usr/bin 
docker/docker-runc && \
+rm -f docker-${DOCKER_VERSION}.tgz && \
+chmod +x /usr/bin/docker && \
+chmod +x /usr/bin/docker-runc
+##################################################################################################
+
 # Install swagger-ui
 RUN curl -sSL -o swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz --no-verbose 
https://github.com/swagger-api/swagger-ui/archive/v${SWAGGER_UI_VERSION}.tar.gz 
&& \
     echo "${SWAGGER_UI_DOWNLOAD_SHA256}  
swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz" | sha256sum -c - && \
@@ -23,7 +42,9 @@ COPY init.sh /
 RUN chmod +x init.sh
 
 RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash 
${NOT_ROOT_USER}
-USER ${NOT_ROOT_USER}
+
+# It is possible to run as non root if you dont need invoker capabilities out 
of the controller today
+#USER ${NOT_ROOT_USER}
 
 EXPOSE 8080
 CMD ["./init.sh", "0"]
diff --git a/core/controller/build.gradle b/core/controller/build.gradle
index 243c776..0059157 100644
--- a/core/controller/build.gradle
+++ b/core/controller/build.gradle
@@ -43,6 +43,7 @@ dependencies {
     compile 
'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.12:0.11.0'
     compile 
'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.12:0.11.0'
     compile project(':common:scala')
+    compile project(':core:invoker')
     scoverage gradle.scoverage.deps
 }
 
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
new file mode 100644
index 0000000..197e3d1
--- /dev/null
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.loadBalancer
+
+import akka.actor.ActorRef
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.ActorSystem
+import akka.event.Logging.InfoLevel
+import akka.stream.ActorMaterializer
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import org.apache.openwhisk.common.LoggingMarkers._
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+/**
+ * Abstract class which provides common logic for all LoadBalancer 
implementations.
+ */
+abstract class CommonLoadBalancer(config: WhiskConfig,
+                                  feedFactory: FeedFactory,
+                                  controllerInstance: 
ControllerInstanceId)(implicit val actorSystem: ActorSystem,
+                                                                            
logging: Logging,
+                                                                            
materializer: ActorMaterializer,
+                                                                            
messagingProvider: MessagingProvider)
+    extends LoadBalancer {
+
+  protected implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
+
+  val lbConfig: ShardingContainerPoolBalancerConfig =
+    
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer)
+  protected val invokerPool: ActorRef
+
+  /** State related to invocations and throttling */
+  protected[loadBalancer] val activationSlots = TrieMap[ActivationId, 
ActivationEntry]()
+  protected[loadBalancer] val activationPromises =
+    TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+  protected val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+  protected val totalActivations = new LongAdder()
+  protected val totalBlackBoxActivationMemory = new LongAdder()
+  protected val totalManagedActivationMemory = new LongAdder()
+
+  protected def emitHistogramMetric() = {
+    
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
 totalActivations.longValue)
+    MetricEmitter.emitHistogramMetric(
+      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
+      totalBlackBoxActivationMemory.longValue + 
totalManagedActivationMemory.longValue)
+    MetricEmitter.emitHistogramMetric(
+      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
+      totalBlackBoxActivationMemory.longValue)
+    MetricEmitter.emitHistogramMetric(
+      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
+      totalManagedActivationMemory.longValue)
+  }
+
+  actorSystem.scheduler.schedule(0.seconds, 10.seconds)(emitHistogramMetric())
+
+  override def activeActivationsFor(namespace: UUID): Future[Int] =
+    
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+  override def totalActiveActivations: Future[Int] = 
Future.successful(totalActivations.intValue())
+
+  /**
+   * 2. Update local state with the activation to be executed scheduled.
+   *
+   * All activations are tracked in the activationSlots map. Additionally, 
blocking invokes
+   * are tracked in the activation results map. When a result is received via 
activeack, it
+   * will cause the result to be forwarded to the caller waiting on the 
result, and cancel
+   * the DB poll which is also trying to do the same.
+   */
+  protected def setupActivation(msg: ActivationMessage,
+                                action: ExecutableWhiskActionMetaData,
+                                instance: InvokerInstanceId): 
Future[Either[ActivationId, WhiskActivation]] = {
+
+    totalActivations.increment()
+    val isBlackboxInvocation = action.exec.pull
+    val totalActivationMemory =
+      if (isBlackboxInvocation) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
+    totalActivationMemory.add(action.limits.memory.megabytes)
+
+    activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
+
+    // Timeout is a multiple of the configured maximum action duration. The 
minimum timeout is the configured standard
+    // value for action durations to avoid too tight timeouts.
+    // Timeouts in general are diluted by a configurable factor. In essence 
this factor controls how much slack you want
+    // to allow in your topics before you start reporting failed activations.
+    val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) 
* lbConfig.timeoutFactor) + 1.minute
+
+    val resultPromise = if (msg.blocking) {
+      activationPromises.getOrElseUpdate(msg.activationId, 
Promise[Either[ActivationId, WhiskActivation]]()).future
+    } else Future.successful(Left(msg.activationId))
+
+    // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
+    // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
+    // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
+    // in this case, if the activation handler is still registered, remove it 
and update the books.
+    activationSlots.getOrElseUpdate(
+      msg.activationId, {
+        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+          processCompletion(msg.activationId, msg.transid, forced = true, 
isSystemError = false, invoker = instance)
+        }
+
+        // please note: timeoutHandler.cancel must be called on all 
non-timeout paths, e.g. Success
+        ActivationEntry(
+          msg.activationId,
+          msg.user.namespace.uuid,
+          instance,
+          action.limits.memory.megabytes.MB,
+          action.limits.concurrency.maxConcurrent,
+          action.fullyQualifiedName(true),
+          timeoutHandler,
+          isBlackboxInvocation)
+      })
+
+    resultPromise
+  }
+
+  protected val messageProducer =
+    messagingProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+  /** 3. Send the activation to the invoker */
+  protected def sendActivationToInvoker(producer: MessageProducer,
+                                        msg: ActivationMessage,
+                                        invoker: InvokerInstanceId): 
Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
+
+    val topic = s"invoker${invoker.toInt}"
+
+    
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+    val start = transid.started(
+      this,
+      LoggingMarkers.CONTROLLER_KAFKA,
+      s"posting topic '$topic' with activation id '${msg.activationId}'",
+      logLevel = InfoLevel)
+
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        transid.finished(
+          this,
+          start,
+          s"posted to 
${status.topic()}[${status.partition()}][${status.offset()}]",
+          logLevel = InfoLevel)
+      case Failure(_) => transid.failed(this, start, s"error on posting to 
topic $topic")
+    }
+  }
+
+  /**
+   * Subscribes to active acks (completion messages from the invokers), and
+   * registers a handler for received active acks from invokers.
+   */
+  private val activationFeed: ActorRef =
+    feedFactory.createFeed(actorSystem, messagingProvider, 
processAcknowledgement)
+
+  /** 4. Get the active-ack message and parse it */
+  protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): 
Future[Unit] = Future {
+    val raw = new String(bytes, StandardCharsets.UTF_8)
+    AcknowledegmentMessage.parse(raw) match {
+      case Success(m: CompletionMessage) =>
+        processCompletion(
+          m.activationId,
+          m.transid,
+          forced = false,
+          isSystemError = m.isSystemError,
+          invoker = m.invoker)
+        activationFeed ! MessageFeed.Processed
+
+      case Success(m: ResultMessage) =>
+        processResult(m.response, m.transid)
+        activationFeed ! MessageFeed.Processed
+
+      case Failure(t) =>
+        activationFeed ! MessageFeed.Processed
+        logging.error(this, s"failed processing message: $raw")
+
+      case _ =>
+        activationFeed ! MessageFeed.Processed
+        logging.error(this, s"Unexpected Acknowledgment message received by 
loadbalancer: $raw")
+    }
+  }
+
+  /** 5. Process the result ack and return it to the user */
+  protected def processResult(response: Either[ActivationId, WhiskActivation], 
tid: TransactionId): Unit = {
+    val aid = response.fold(l => l, r => r.activationId)
+
+    // Resolve the promise to send the result back to the user.
+    // The activation will be removed from the activation slots later, when 
the completion message
+    // is received (because the slot in the invoker is not yet free for new 
activations).
+    activationPromises.remove(aid).foreach(_.trySuccess(response))
+    logging.info(this, s"received result ack for '$aid'")(tid)
+  }
+
+  protected def releaseInvoker(invoker: InvokerInstanceId, entry: 
ActivationEntry)
+
+  /** 6. Process the completion ack and update the state */
+  protected[loadBalancer] def processCompletion(aid: ActivationId,
+                                                tid: TransactionId,
+                                                forced: Boolean,
+                                                isSystemError: Boolean,
+                                                invoker: InvokerInstanceId): 
Unit = {
+
+    val invocationResult = if (forced) {
+      InvocationFinishedResult.Timeout
+    } else {
+      // If the response contains a system error, report that, otherwise 
report Success
+      // Left generally is considered a Success, since that could be a message 
not fitting into Kafka
+      if (isSystemError) {
+        InvocationFinishedResult.SystemError
+      } else {
+        InvocationFinishedResult.Success
+      }
+    }
+
+    activationSlots.remove(aid) match {
+      case Some(entry) =>
+        totalActivations.decrement()
+        val totalActivationMemory =
+          if (entry.isBlackbox) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
+        totalActivationMemory.add(entry.memory.toMB * (-1))
+        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+
+        releaseInvoker(invoker, entry)
+
+        if (!forced) {
+          entry.timeoutHandler.cancel()
+          // notice here that the activationPromises is not touched, because 
the expectation is that
+          // the active ack is received as expected, and processing that 
message removed the promise
+          // from the corresponding map
+        } else {
+          // the entry has timed out; if the active ack is still around, 
remove its entry also
+          // and complete the promise with a failure if necessary
+          activationPromises
+            .remove(aid)
+            .foreach(_.tryFailure(new Throwable("no completion or active ack 
received yet")))
+        }
+
+        logging.info(this, s"${if (!forced) "received" else "forced"} 
completion ack for '$aid'")(tid)
+        // Active acks that are received here are strictly from user actions - 
health actions are not part of
+        // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
+        // guard this
+        invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+      case None if tid == TransactionId.invokerHealth =>
+        // Health actions do not have an ActivationEntry as they are written 
on the message bus directly. Their result
+        // is important to pass to the invokerPool because they are used to 
determine if the invoker can be considered
+        // healthy again.
+        logging.info(this, s"received completion ack for health action on 
$invoker")(tid)
+        // guard this
+        invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+      case None if !forced =>
+        // Received an active-ack that has already been taken out of the state 
because of a timeout (forced active-ack).
+        // The result is ignored because a timeout has already been reported 
to the invokerPool per the force.
+        logging.debug(this, s"received completion ack for '$aid' which has no 
entry")(tid)
+      case None =>
+        // The entry has already been removed by an active ack. This part of 
the code is reached by the timeout and can
+        // happen if active-ack and timeout happen roughly at the same time 
(the timeout was triggered before the active
+        // ack canceled the timer). As the active ack is already processed we 
don't have to do anything here.
+        logging.debug(this, s"forced completion ack for '$aid' which has no 
entry")(tid)
+    }
+  }
+}
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
new file mode 100644
index 0000000..130203a
--- /dev/null
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.loadBalancer
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.WhiskConfig._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
+import org.apache.openwhisk.core.entity.ControllerInstanceId
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.invoker.InvokerReactive
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.ExecutionContextFactory
+import pureconfig._
+import org.apache.openwhisk.core.entity.size._
+
+import scala.concurrent.Future
+
+/**
+ * Lean loadbalancer implemetation.
+ *
+ * Communicates with Invoker directly without Kafka in the middle. Invoker 
does not exist as a separate entity, it is built together with Controller
+ * Uses LeanMessagingProvider to use in-memory queue instead of Kafka
+ */
+class LeanBalancer(config: WhiskConfig,
+                   feedFactory: FeedFactory,
+                   controllerInstance: ControllerInstanceId,
+                   implicit val messagingProvider: MessagingProvider = 
SpiLoader.get[MessagingProvider])(
+  implicit actorSystem: ActorSystem,
+  logging: Logging,
+  materializer: ActorMaterializer)
+    extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
+
+  /** Loadbalancer interface methods */
+  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = 
Future.successful(IndexedSeq.empty[InvokerHealth])
+  override def clusterSize: Int = 1
+
+  val poolConfig: ContainerPoolConfig = 
loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
+
+  val invokerName = InvokerInstanceId(0, None, None, poolConfig.userMemory)
+
+  /** 1. Publish a message to the loadbalancer */
+  override def publish(action: ExecutableWhiskActionMetaData, msg: 
ActivationMessage)(
+    implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
+
+    /** 2. Update local state with the activation to be executed scheduled. */
+    val activationResult = setupActivation(msg, action, invokerName)
+    sendActivationToInvoker(messageProducer, msg, invokerName).map(_ => 
activationResult)
+  }
+
+  /** Creates an invoker for executing user actions. There is only one invoker 
in the lean model. */
+  private def makeALocalThreadedInvoker() {
+    implicit val ec = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+    val actorSystema: ActorSystem =
+      ActorSystem(name = "invoker-actor-system", defaultExecutionContext = 
Some(ec))
+    new InvokerReactive(config, invokerName, messageProducer)(actorSystema, 
implicitly)
+  }
+
+  makeALocalThreadedInvoker()
+
+  override protected val invokerPool: ActorRef = 
actorSystem.actorOf(Props.empty)
+
+  override protected def releaseInvoker(invoker: InvokerInstanceId, entry: 
ActivationEntry) = {
+    // Currently do nothing
+  }
+
+  override protected def emitHistogramMetric() = {
+    super.emitHistogramMetric()
+  }
+}
+
+object LeanBalancer extends LoadBalancerProvider {
+
+  override def instance(whiskConfig: WhiskConfig, instance: 
ControllerInstanceId)(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): LoadBalancer = {
+
+    new LeanBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), 
instance)
+  }
+
+  def requiredProperties =
+    Map(runtimesRegistry -> "") ++
+      ExecManifest.requiredProperties ++
+      wskApiHost
+}
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index da1e1b4..041ce0c 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -18,13 +18,14 @@
 package org.apache.openwhisk.core.loadBalancer
 
 import scala.concurrent.Future
-import akka.actor.ActorSystem
+import akka.actor.{ActorRefFactory, ActorSystem, Props}
 import akka.stream.ActorMaterializer
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.spi.Spi
+import scala.concurrent.duration._
 
 /**
  * Describes an abstract invoker. An invoker is a local container pool manager 
that
@@ -85,6 +86,29 @@ trait LoadBalancerProvider extends Spi {
   def instance(whiskConfig: WhiskConfig, instance: 
ControllerInstanceId)(implicit actorSystem: ActorSystem,
                                                                          
logging: Logging,
                                                                          
materializer: ActorMaterializer): LoadBalancer
+
+  /** Return default FeedFactory */
+  def createFeedFactory(whiskConfig: WhiskConfig, instance: 
ControllerInstanceId)(implicit actorSystem: ActorSystem,
+                                                                               
   logging: Logging): FeedFactory = {
+
+    val activeAckTopic = s"completed${instance.asString}"
+    val maxActiveAcksPerPoll = 128
+    val activeAckPollDuration = 1.second
+
+    new FeedFactory {
+      def createFeed(f: ActorRefFactory, provider: MessagingProvider, acker: 
Array[Byte] => Future[Unit]) = {
+        f.actorOf(Props {
+          new MessageFeed(
+            "activeack",
+            logging,
+            provider.getConsumer(whiskConfig, activeAckTopic, activeAckTopic, 
maxPeek = maxActiveAcksPerPoll),
+            maxActiveAcksPerPoll,
+            activeAckPollDuration,
+            acker)
+        })
+      }
+    }
+  }
 }
 
 /** Exception thrown by the loadbalancer */
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 914b3ac..0464393 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -19,34 +19,27 @@ package org.apache.openwhisk.core.loadBalancer
 
 import akka.actor.ActorRef
 import akka.actor.ActorRefFactory
-import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
-import java.util.concurrent.atomic.LongAdder
 
 import akka.actor.{Actor, ActorSystem, Cancellable, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
-import akka.event.Logging.InfoLevel
 import akka.management.AkkaManagement
 import akka.management.cluster.bootstrap.ClusterBootstrap
 import akka.stream.ActorMaterializer
 import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
-import org.apache.openwhisk.common.LoggingMarkers._
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.WhiskConfig._
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.common.LoggingMarkers._
 import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, 
Unhealthy, Unresponsive}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.spi.SpiLoader
 
 import scala.annotation.tailrec
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
+import scala.concurrent.Future
 
 /**
  * A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -151,17 +144,13 @@ import scala.util.{Failure, Success}
 class ShardingContainerPoolBalancer(
   config: WhiskConfig,
   controllerInstance: ControllerInstanceId,
-  private val feedFactory: FeedFactory,
-  private val invokerPoolFactory: InvokerPoolFactory,
-  lbConfig: ShardingContainerPoolBalancerConfig =
-    
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
-  private val messagingProvider: MessagingProvider = 
SpiLoader.get[MessagingProvider])(
-  implicit val actorSystem: ActorSystem,
+  feedFactory: FeedFactory,
+  val invokerPoolFactory: InvokerPoolFactory,
+  implicit val messagingProvider: MessagingProvider = 
SpiLoader.get[MessagingProvider])(
+  implicit actorSystem: ActorSystem,
   logging: Logging,
   materializer: ActorMaterializer)
-    extends LoadBalancer {
-
-  private implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
+    extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
 
   /** Build a cluster of all loadbalancers */
   private val cluster: Option[Cluster] = if 
(loadConfigOrThrow[ClusterConfig](ConfigKeys.cluster).useClusterBootstrap) {
@@ -174,45 +163,26 @@ class ShardingContainerPoolBalancer(
     None
   }
 
-  /** State related to invocations and throttling */
-  protected[loadBalancer] val activationSlots = TrieMap[ActivationId, 
ActivationEntry]()
-  protected[loadBalancer] val activationPromises =
-    TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
-  private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
-  private val totalActivations = new LongAdder()
-  private val totalBlackBoxActivationMemory = new LongAdder()
-  private val totalManagedActivationMemory = new LongAdder()
-
-  /** State needed for scheduling. */
-  protected[loadBalancer] val schedulingState = 
ShardingContainerPoolBalancerState()(lbConfig)
-
-  actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
-    
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
 totalActivations.longValue)
-    MetricEmitter.emitHistogramMetric(
-      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
-      totalBlackBoxActivationMemory.longValue + 
totalManagedActivationMemory.longValue)
+  override protected def emitHistogramMetric() = {
+    super.emitHistogramMetric()
     MetricEmitter.emitHistogramMetric(
-      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
-      totalBlackBoxActivationMemory.longValue)
-    MetricEmitter.emitHistogramMetric(
-      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
-      totalManagedActivationMemory.longValue)
-    MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_BLACKBOX, 
schedulingState.blackboxInvokers.foldLeft(0L) {
-      (total, curr) =>
+      INVOKER_TOTALMEM_BLACKBOX,
+      schedulingState.blackboxInvokers.foldLeft(0L) { (total, curr) =>
         if (curr.status.isUsable) {
           curr.id.userMemory.toMB + total
         } else {
           total
         }
-    })
-    MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_MANAGED, 
schedulingState.managedInvokers.foldLeft(0L) {
-      (total, curr) =>
+      })
+    MetricEmitter.emitHistogramMetric(
+      INVOKER_TOTALMEM_MANAGED,
+      schedulingState.managedInvokers.foldLeft(0L) { (total, curr) =>
         if (curr.status.isUsable) {
           curr.id.userMemory.toMB + total
         } else {
           total
         }
-    })
+      })
     MetricEmitter.emitHistogramMetric(
       HEALTHY_INVOKER_MANAGED,
       schedulingState.managedInvokers.count(_.status == Healthy))
@@ -239,6 +209,9 @@ class ShardingContainerPoolBalancer(
       schedulingState.blackboxInvokers.count(_.status == Offline))
   }
 
+  /** State needed for scheduling. */
+  val schedulingState = ShardingContainerPoolBalancerState()(lbConfig)
+
   /**
    * Monitors invoker supervision and the cluster to update the state 
sequentially
    *
@@ -283,9 +256,6 @@ class ShardingContainerPoolBalancer(
 
   /** Loadbalancer interface methods */
   override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = 
Future.successful(schedulingState.invokers)
-  override def activeActivationsFor(namespace: UUID): Future[Int] =
-    
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
-  override def totalActiveActivations: Future[Int] = 
Future.successful(totalActivations.intValue())
   override def clusterSize: Int = schedulingState.clusterSize
 
   /** 1. Publish a message to the loadbalancer */
@@ -344,201 +314,19 @@ class ShardingContainerPoolBalancer(
       }
   }
 
-  /**
-   * 2. Update local state with the to be executed activation.
-   *
-   * All activations are tracked in the activationSlots map. Additionally, 
blocking invokes
-   * are tracked in the activation results map. When a result is received via 
activeack, it
-   * will cause the result to be forwarded to the caller waiting on the 
result, and cancel
-   * the DB poll which is also trying to do the same.
-   */
-  private def setupActivation(msg: ActivationMessage,
-                              action: ExecutableWhiskActionMetaData,
-                              instance: InvokerInstanceId): 
Future[Either[ActivationId, WhiskActivation]] = {
-
-    totalActivations.increment()
-    val isBlackboxInvocation = action.exec.pull
-    val totalActivationMemory =
-      if (isBlackboxInvocation) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
-    totalActivationMemory.add(action.limits.memory.megabytes)
-
-    activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
-
-    // Timeout is a multiple of the configured maximum action duration. The 
minimum timeout is the configured standard
-    // value for action durations to avoid too tight timeouts.
-    // Timeouts in general are diluted by a configurable factor. In essence 
this factor controls how much slack you want
-    // to allow in your topics before you start reporting failed activations.
-    val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) 
* lbConfig.timeoutFactor) + 1.minute
-
-    val resultPromise = if (msg.blocking) {
-      activationPromises.getOrElseUpdate(msg.activationId, 
Promise[Either[ActivationId, WhiskActivation]]()).future
-    } else Future.successful(Left(msg.activationId))
-
-    // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
-    // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
-    // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
-    // in this case, if the activation handler is still registered, remove it 
and update the books.
-    activationSlots.getOrElseUpdate(
-      msg.activationId, {
-        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
-          processCompletion(msg.activationId, msg.transid, forced = true, 
isSystemError = false, invoker = instance)
-        }
-
-        // please note: timeoutHandler.cancel must be called on all 
non-timeout paths, e.g. Success
-        ActivationEntry(
-          msg.activationId,
-          msg.user.namespace.uuid,
-          instance,
-          action.limits.memory.megabytes.MB,
-          action.limits.concurrency.maxConcurrent,
-          action.fullyQualifiedName(true),
-          timeoutHandler,
-          isBlackboxInvocation)
-      })
-
-    resultPromise
-  }
-
-  private val messageProducer = messagingProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
-
-  /** 3. Send the activation to the invoker */
-  private def sendActivationToInvoker(producer: MessageProducer,
-                                      msg: ActivationMessage,
-                                      invoker: InvokerInstanceId): 
Future[RecordMetadata] = {
-    implicit val transid: TransactionId = msg.transid
-
-    val topic = s"invoker${invoker.toInt}"
-
-    
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
-    val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA)
-
-    producer.send(topic, msg).andThen {
-      case Success(status) =>
-        transid.finished(
-          this,
-          start,
-          s"posted to 
${status.topic()}[${status.partition()}][${status.offset()}]",
-          logLevel = InfoLevel)
-      case Failure(_) => transid.failed(this, start, s"error on posting to 
topic $topic")
-    }
-  }
-
-  /**
-   * Subscribes to active acks (completion messages from the invokers), and
-   * registers a handler for received active acks from invokers.
-   */
-  private val activationFeed: ActorRef =
-    feedFactory.createFeed(actorSystem, messagingProvider, 
processAcknowledgement)
-
-  /** 4. Get the active-ack message and parse it */
-  protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): 
Future[Unit] = Future {
-    val raw = new String(bytes, StandardCharsets.UTF_8)
-    AcknowledegmentMessage.parse(raw) match {
-      case Success(m: CompletionMessage) =>
-        processCompletion(
-          m.activationId,
-          m.transid,
-          forced = false,
-          isSystemError = m.isSystemError,
-          invoker = m.invoker)
-        activationFeed ! MessageFeed.Processed
-
-      case Success(m: ResultMessage) =>
-        processResult(m.response, m.transid)
-        activationFeed ! MessageFeed.Processed
-
-      case Failure(t) =>
-        activationFeed ! MessageFeed.Processed
-        logging.error(this, s"failed processing message: $raw")
-
-      case _ =>
-        activationFeed ! MessageFeed.Processed
-        logging.error(this, s"Unexpected Acknowledgment message received by 
loadbalancer: $raw")
-    }
-  }
-
-  /** 5. Process the result ack and return it to the user */
-  private def processResult(response: Either[ActivationId, WhiskActivation], 
tid: TransactionId): Unit = {
-    val aid = response.fold(l => l, r => r.activationId)
-
-    // Resolve the promise to send the result back to the user.
-    // The activation will be removed from the activation slots later, when 
the completion message
-    // is received (because the slot in the invoker is not yet free for new 
activations).
-    activationPromises.remove(aid).foreach(_.trySuccess(response))
-    logging.info(this, s"received result ack for '$aid'")(tid)
-  }
-
-  /** Process the completion ack and update the state */
-  protected[loadBalancer] def processCompletion(aid: ActivationId,
-                                                tid: TransactionId,
-                                                forced: Boolean,
-                                                isSystemError: Boolean,
-                                                invoker: InvokerInstanceId): 
Unit = {
-
-    val invocationResult = if (forced) {
-      InvocationFinishedResult.Timeout
-    } else {
-      // If the response contains a system error, report that, otherwise 
report Success
-      // Left generally is considered a Success, since that could be a message 
not fitting into Kafka
-      if (isSystemError) {
-        InvocationFinishedResult.SystemError
-      } else {
-        InvocationFinishedResult.Success
-      }
-    }
-
-    activationSlots.remove(aid) match {
-      case Some(entry) =>
-        totalActivations.decrement()
-        val totalActivationMemory =
-          if (entry.isBlackbox) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
-        totalActivationMemory.add(entry.memory.toMB * (-1))
-        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
-        schedulingState.invokerSlots
-          .lift(invoker.toInt)
-          .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, 
entry.maxConcurrent, entry.memory.toMB.toInt))
-        if (!forced) {
-          entry.timeoutHandler.cancel()
-          // notice here that the activationPromises is not touched, because 
the expectation is that
-          // the active ack is received as expected, and processing that 
message removed the promise
-          // from the corresponding map
-        } else {
-          // the entry has timed out; if the active ack is still around, 
remove its entry also
-          // and complete the promise with a failure if necessary
-          activationPromises
-            .remove(aid)
-            .foreach(_.tryFailure(new Throwable("no completion or active ack 
received yet")))
-        }
-
-        logging.info(this, s"${if (!forced) "received" else "forced"} 
completion ack for '$aid'")(tid)
-        // Active acks that are received here are strictly from user actions - 
health actions are not part of
-        // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
-        invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
-      case None if tid == TransactionId.invokerHealth =>
-        // Health actions do not have an ActivationEntry as they are written 
on the message bus directly. Their result
-        // is important to pass to the invokerPool because they are used to 
determine if the invoker can be considered
-        // healthy again.
-        logging.info(this, s"received completion ack for health action on 
$invoker")(tid)
-        invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
-      case None if !forced =>
-        // Received an active-ack that has already been taken out of the state 
because of a timeout (forced active-ack).
-        // The result is ignored because a timeout has already been reported 
to the invokerPool per the force.
-        logging.debug(this, s"received completion ack for '$aid' which has no 
entry")(tid)
-      case None =>
-        // The entry has already been removed by an active ack. This part of 
the code is reached by the timeout and can
-        // happen if active-ack and timeout happen roughly at the same time 
(the timeout was triggered before the active
-        // ack canceled the timer). As the active ack is already processed we 
don't have to do anything here.
-        logging.debug(this, s"forced completion ack for '$aid' which has no 
entry")(tid)
-    }
-  }
-
-  private val invokerPool =
+  override val invokerPool =
     invokerPoolFactory.createInvokerPool(
       actorSystem,
       messagingProvider,
       messageProducer,
       sendActivationToInvoker,
       Some(monitor))
+
+  override protected def releaseInvoker(invoker: InvokerInstanceId, entry: 
ActivationEntry) = {
+    schedulingState.invokerSlots
+      .lift(invoker.toInt)
+      .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, 
entry.maxConcurrent, entry.memory.toMB.toInt))
+  }
 }
 
 object ShardingContainerPoolBalancer extends LoadBalancerProvider {
@@ -548,24 +336,6 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
     logging: Logging,
     materializer: ActorMaterializer): LoadBalancer = {
 
-    val activeAckTopic = s"completed${instance.asString}"
-    val maxActiveAcksPerPoll = 128
-    val activeAckPollDuration = 1.second
-
-    val feedFactory = new FeedFactory {
-      def createFeed(f: ActorRefFactory, provider: MessagingProvider, acker: 
Array[Byte] => Future[Unit]) = {
-        f.actorOf(Props {
-          new MessageFeed(
-            "activeack",
-            logging,
-            provider.getConsumer(whiskConfig, activeAckTopic, activeAckTopic, 
maxPeek = maxActiveAcksPerPoll),
-            maxActiveAcksPerPoll,
-            activeAckPollDuration,
-            acker)
-        })
-      }
-    }
-
     val invokerPoolFactory = new InvokerPoolFactory {
       override def createInvokerPool(
         actorRefFactory: ActorRefFactory,
@@ -585,7 +355,11 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
       }
 
     }
-    new ShardingContainerPoolBalancer(whiskConfig, instance, feedFactory, 
invokerPoolFactory)
+    new ShardingContainerPoolBalancer(
+      whiskConfig,
+      instance,
+      createFeedFactory(whiskConfig, instance),
+      invokerPoolFactory)
   }
 
   def requiredProperties: Map[String, String] = kafkaHosts
@@ -613,7 +387,6 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
    *
    * @param maxConcurrent concurrency limit supported by this action
    * @param invokers a list of available invokers to search in, including 
their state
-   * @param concurrentSlots optional map of invoker -> semaphore to track 
concurrency slots for this action
    * @param dispatched semaphores for each invoker to give the slots away from
    * @param slots Number of slots, that need to be acquired (e.g. memory in MB)
    * @param index the index to start from (initially should be the 
"homeInvoker"
diff --git a/tests/build.gradle b/tests/build.gradle
index e427d72..5fda213 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -51,6 +51,20 @@ def leanExcludes = [
     '**/MaxActionDurationTests*',
 ]
 
+def systemIncludes = [
+            "org/apache/openwhisk/core/limits/**",
+            "org/apache/openwhisk/core/admin/**",
+            "org/apache/openwhisk/core/cli/test/**",
+            "org/apache/openwhisk/core/apigw/actions/test/**",
+            "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
+            "org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
+            "apigw/healthtests/**",
+            "ha/**",
+            "services/**",
+            "system/basic/**",
+            "system/rest/**",
+]
+
 ext.testSets = [
     "REQUIRE_ONLY_DB" : [
         "includes" : [
@@ -67,19 +81,7 @@ ext.testSets = [
         ]
     ],
     "REQUIRE_SYSTEM" : [
-        "includes" : [
-            "org/apache/openwhisk/core/limits/**",
-            "org/apache/openwhisk/core/admin/**",
-            "org/apache/openwhisk/core/cli/test/**",
-            "org/apache/openwhisk/core/apigw/actions/test/**",
-            "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
-            "org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
-            "apigw/healthtests/**",
-            "ha/**",
-            "services/**",
-            "system/basic/**",
-            "system/rest/**",
-        ],
+        "includes" : systemIncludes,
         "excludes": [
             "system/basic/WskMultiRuntimeTests*"
         ]
@@ -98,6 +100,15 @@ ext.testSets = [
         "includes" : [
             "system/basic/**"
         ]
+    ],
+    "REQUIRE_LEAN_SYSTEM" : [
+        "includes" : systemIncludes,
+        
+        // Tests suits below require Kafka so they are excluded for Lean 
System tests
+        "excludes" : [
+            "**/*KafkaConnectorTests*",
+            "system/basic/WskMultiRuntimeTests*"
+        ]
     ]
 ]
 
diff --git a/tests/performance/preparation/deploy-lean.sh 
b/tests/performance/preparation/deploy-lean.sh
new file mode 100755
index 0000000..361003b
--- /dev/null
+++ b/tests/performance/preparation/deploy-lean.sh
@@ -0,0 +1,38 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -e
+SCRIPTDIR="$(cd "$(dirname "$0")"; pwd)"
+ROOTDIR="$SCRIPTDIR/../../.."
+
+# Build Openwhisk
+cd $ROOTDIR
+TERM=dumb ./gradlew distDocker -PdockerImagePrefix=testing $GRADLE_PROJS_SKIP
+
+# Deploy Openwhisk
+cd $ROOTDIR/ansible
+ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e 
limit_invocations_concurrent=999999 -e controller_client_auth=false -e 
userLogs_spi=\"org.apache.openwhisk.core.containerpool.logging.LogDriverLogStoreProvider\""
+
+$ANSIBLE_CMD setup.yml
+
+$ANSIBLE_CMD prereq.yml
+$ANSIBLE_CMD couchdb.yml
+$ANSIBLE_CMD initdb.yml
+$ANSIBLE_CMD wipe.yml
+
+$ANSIBLE_CMD controller.yml -e lean=true
+$ANSIBLE_CMD edge.yml
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index cda8dfd..42ddbcf 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -476,13 +476,7 @@ class ShardingContainerPoolBalancerTests
         TestProbe().testActor
     }
     val balancer =
-      new ShardingContainerPoolBalancer(
-        config,
-        ControllerInstanceId("0"),
-        feedProbe,
-        invokerPoolProbe,
-        lbConfig(0.0),
-        mockMessaging)
+      new ShardingContainerPoolBalancer(config, ControllerInstanceId("0"), 
feedProbe, invokerPoolProbe, mockMessaging)
 
     val invokers = IndexedSeq.tabulate(numInvokers) { i =>
       new InvokerHealth(InvokerInstanceId(i, userMemory = invokerMem), Healthy)
diff --git a/tools/travis/runLeanSystemTests.sh 
b/tools/travis/runLeanSystemTests.sh
new file mode 100755
index 0000000..372fbc1
--- /dev/null
+++ b/tools/travis/runLeanSystemTests.sh
@@ -0,0 +1,36 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+SCRIPTDIR=$(cd $(dirname "$0") && pwd)
+ROOTDIR="$SCRIPTDIR/../.."
+
+cd $ROOTDIR/tools/travis
+
+export ORG_GRADLE_PROJECT_testSetName="REQUIRE_LEAN_SYSTEM"
+export GRADLE_COVERAGE=true
+
+./setupPrereq.sh /ansible/files/runtimes-nodeonly.json
+
+./distDocker-lean.sh
+
+./setupLeanSystem.sh /ansible/files/runtimes-nodeonly.json
+
+./runTests.sh
diff --git a/tools/travis/setupLeanSystem.sh b/tools/travis/setupLeanSystem.sh
new file mode 100755
index 0000000..89568b2
--- /dev/null
+++ b/tools/travis/setupLeanSystem.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -e
+
+# Build script for Travis-CI.
+SECONDS=0
+SCRIPTDIR=$(cd $(dirname "$0") && pwd)
+ROOTDIR="$SCRIPTDIR/../.."
+RUNTIMES_MANIFEST=${1:-"/ansible/files/runtimes.json"}
+
+cd $ROOTDIR/ansible
+
+$ANSIBLE_CMD openwhisk.yml -e manifest_file="$RUNTIMES_MANIFEST" -e lean=true
+$ANSIBLE_CMD apigateway.yml
+$ANSIBLE_CMD routemgmt.yml
+
+echo "Time taken for ${0##*/} is $SECONDS secs"
diff --git a/tools/vagrant/README.md b/tools/vagrant/README.md
index fcec706..0697c4b 100644
--- a/tools/vagrant/README.md
+++ b/tools/vagrant/README.md
@@ -309,3 +309,12 @@ Ignore error message `Sub-process /usr/bin/dpkg returned 
an error code (1)` when
 creating Vagrant VM using `gui-true`. Remember to use `gui=true` every time you
 do `vagrant reload`. Or, you can enable the GUI directly by editing the Vagrant
 file.
+
+## Lean Setup
+To have a lean setup (no Kafka, Zookeeper and no Invokers as separate entities)
+
+Set environment variable LEAN to true before creating vagrant VM
+```
+export LEAN=true
+```
+
diff --git a/tools/vagrant/Vagrantfile b/tools/vagrant/Vagrantfile
index 2466e35..ced5983 100644
--- a/tools/vagrant/Vagrantfile
+++ b/tools/vagrant/Vagrantfile
@@ -149,7 +149,16 @@ Vagrant.configure('2') do |config|
     echo "`date`: deploy-start" >> /tmp/vagrant-times.txt
     cd ${ANSIBLE_HOME}
     su vagrant -c 'ansible-playbook -i environments/vagrant wipe.yml'
-    su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e 
invoker_use_runc=False'
+
+    export LEAN=#{ENV['LEAN']} || "false"
+    if [[ $LEAN == "true" ]]; then
+      # Deploy Lean Openwhisk (consolidated controller + invoker without 
kafka, zookeeper etc.)
+      su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e 
invoker_use_runc=False -e controller_akka_provider=local -e lean=true'
+    else
+      # Deploy full Openwhisk stack
+      su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e 
invoker_use_runc=False'
+    fi
+
     su vagrant -c 'ansible-playbook -i environments/vagrant postdeploy.yml'
     su vagrant -c 'ansible-playbook -i environments/vagrant apigateway.yml'
     su vagrant -c 'ansible-playbook -i environments/vagrant routemgmt.yml'

Reply via email to