This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eda221  [New Scheduler] Etcd installation & Implements EtcdClient 
(#5031)
5eda221 is described below

commit 5eda22171a238e933121b3918c5940e37fb009c5
Author: 김건희 <[email protected]>
AuthorDate: Tue Feb 9 10:23:34 2021 +0900

    [New Scheduler] Etcd installation & Implements EtcdClient (#5031)
    
    * Implements EtcdClient
    
    * Add license header
    
    * Add configuration for tests
    
    * Exclude etcd config from controller & invoker
    
    * Add config for etcd test
    
    * Separate etcd test from whisk unit test
    
    * Apply scala compilation
---
 ansible/environments/docker-machine/hosts.j2.ini   |   5 +
 ansible/environments/local/hosts.j2.ini            |   3 +
 ansible/environments/vagrant/hosts                 |   3 +
 ansible/etcd.yml                                   |  22 ++
 ansible/group_vars/all                             |  26 ++
 ansible/roles/etcd/tasks/clean.yml                 |  25 ++
 ansible/roles/etcd/tasks/deploy.yml                |  70 ++++++
 ansible/roles/etcd/tasks/main.yml                  |  26 ++
 common/scala/build.gradle                          |   3 +
 common/scala/src/main/resources/application.conf   |   5 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   5 +
 .../apache/openwhisk/core/entity/CreationId.scala  |  84 +++++++
 .../apache/openwhisk/core/etcd/EtcdClient.scala    | 279 +++++++++++++++++++++
 .../org/apache/openwhisk/core/etcd/EtcdUtils.scala | 250 ++++++++++++++++++
 .../org/apache/openwhisk/http/ErrorResponse.scala  |   6 +
 tests/build.gradle                                 |  11 +-
 tests/src/test/resources/application.conf.j2       |  10 +
 .../apache/openwhisk/common/etcd/EtcdKvTests.scala |  74 ++++++
 .../common/etcd/EtcdLeaderShipUnitTests.scala      | 178 +++++++++++++
 19 files changed, 1084 insertions(+), 1 deletion(-)

diff --git a/ansible/environments/docker-machine/hosts.j2.ini 
b/ansible/environments/docker-machine/hosts.j2.ini
index 45f327b..e1549e6 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -39,6 +39,11 @@ invoker1                    ansible_host={{ 
docker_machine_ip }}
 [elasticsearch:children]
 db
 
+[etcd]
+etcd0            ansible_host={{ docker_machine_ip }}
+{% if mode is defined and 'HA' in mode %}
+etcd1            ansible_host={{ docker_machine_ip }}
+
 ; define variables
 [all:vars]
 ansible_connection=ssh
diff --git a/ansible/environments/local/hosts.j2.ini 
b/ansible/environments/local/hosts.j2.ini
index 7512d966..cceb706 100644
--- a/ansible/environments/local/hosts.j2.ini
+++ b/ansible/environments/local/hosts.j2.ini
@@ -39,3 +39,6 @@ db
 
 [apigateway]
 172.17.0.1          ansible_host=172.17.0.1 ansible_connection=local
+
+[etcd]
+etcd0            ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/environments/vagrant/hosts 
b/ansible/environments/vagrant/hosts
index e32f943..fd30391 100644
--- a/ansible/environments/vagrant/hosts
+++ b/ansible/environments/vagrant/hosts
@@ -27,3 +27,6 @@ invoker0            ansible_host=172.17.0.1 
ansible_connection=local
 
 [apigateway]
 172.17.0.1          ansible_host=172.17.0.1 ansible_connection=local
+
+[etcd]
+etcd0            ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/etcd.yml b/ansible/etcd.yml
new file mode 100644
index 0000000..b167585
--- /dev/null
+++ b/ansible/etcd.yml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This playbook deploys Openwhisk Invokers.
+
+- hosts: etcd
+  roles:
+  - etcd
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 564a021..555ad88 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -51,6 +51,7 @@ whisk:
   feature_flags:
     require_api_key_annotation: "{{ require_api_key_annotation | default(true) 
| lower }}"
     require_response_payload: "{{ require_response_payload | default(true) | 
lower }}"
+  cluster_name: "{{ whisk_cluster_name | default('whisk') }}"
 
 ##
 # configuration parameters related to support runtimes (see 
org.apache.openwhisk.core.entity.ExecManifest for schema of the manifest).
@@ -424,3 +425,28 @@ user_events: "{{ user_events_enabled | default(false) | 
lower }}"
 
 durationChecker:
     timeWindow: "{{ duration_checker_time_window | default('1 d') }}"
+
+etcd:
+  version: "{{ etcd_version | default('v3.4.0') }}"
+  client:
+    port: 2379
+  server:
+    port: 2480
+  cluster:
+    token: "{{ etcd_cluster_token | default('openwhisk-etcd-token') }}"
+  dir:
+    data: "{{ etcd_data_dir | default('') }}"
+  lease:
+    timeout: "{{ etcd_lease_timeout | default(1) }}"
+  loglevel: "{{ etcd_log_level | default('info') }}"
+  quota_backend_bytes: "{{ etcd_quota_backend_bytes | default(0) }}"
+  snapshot_count: "{{ etcd_snapshot_count | default(100000) }}"
+  auto_compaction_retention: "{{ etcd_auto_compaction_retention | default(1) 
}}"
+  auto_compaction_mode: "{{ etcd_auto_compaction_mode | default('periodic') }}"
+  pool_threads: "{{ etcd_pool_threads | default(10) }}"
+
+etcd_connect_string: "{% set ret = [] %}\
+                      {% for host in groups['etcd'] %}\
+                        {{ ret.append( hostvars[host].ansible_host + ':' + 
((etcd.client.port+loop.index-1)|string) ) }}\
+                      {% endfor %}\
+                      {{ ret | join(',') }}"
diff --git a/ansible/roles/etcd/tasks/clean.yml 
b/ansible/roles/etcd/tasks/clean.yml
new file mode 100644
index 0000000..418072b
--- /dev/null
+++ b/ansible/roles/etcd/tasks/clean.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# Remove etcd containers.
+
+- name: remove etcd
+  docker_container:
+    name: etcd{{ groups['etcd'].index(inventory_hostname) }}
+    keep_volumes: True
+    state: absent
+  ignore_errors: True
diff --git a/ansible/roles/etcd/tasks/deploy.yml 
b/ansible/roles/etcd/tasks/deploy.yml
new file mode 100644
index 0000000..63b908c
--- /dev/null
+++ b/ansible/roles/etcd/tasks/deploy.yml
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This role will install etcd in group 'etcd' in the environment inventory
+
+- name: "Set the name of the etcd node"
+  set_fact:
+    name: "etcd{{ groups['etcd'].index(inventory_hostname) }}"
+
+- name: "set the volume_dir"
+  set_fact:
+    volume_dir: "{{ etcd.dir.data }}/etcd{{ 
groups['etcd'].index(inventory_hostname) }}:/etcd-data"
+  when: etcd_data_dir is defined
+
+
+- name: "Set the cluster of the etcd cluster"
+  set_fact:
+    cluster: "{% set etcdhosts = [] %}
+              {% for host in groups['etcd'] %}
+                  {{ etcdhosts.append('etcd' + ((loop.index-1)|string) + '=' + 
'http://' + hostvars[host].ansible_host + ':' + ((2480+loop.index-1)|string) ) 
}}
+              {% endfor %}
+              {{ etcdhosts | join(',') }}"
+
+- name: (re)start etcd
+  docker_container:
+    name: etcd{{ groups['etcd'].index(inventory_hostname) }}
+    image: quay.io/coreos/etcd:{{ etcd.version }}
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    volumes: "{{volume_dir | default([])}}"
+    command: "/usr/local/bin/etcd \
+               --data-dir=/etcd-data --name '{{ name }}' \
+               --initial-advertise-peer-urls http://{{ ansible_host }}:{{ 
etcd.server.port + groups['etcd'].index(inventory_hostname) }} \
+               --advertise-client-urls http://{{ ansible_host }}:{{ 
etcd.client.port + groups['etcd'].index(inventory_hostname) }} \
+               --listen-peer-urls http://0.0.0.0:{{ etcd.server.port + 
groups['etcd'].index(inventory_hostname) }} \
+               --listen-client-urls http://0.0.0.0:{{ etcd.client.port + 
groups['etcd'].index(inventory_hostname) }} \
+               --initial-cluster {{ cluster }} \
+               --initial-cluster-state new --initial-cluster-token {{ 
etcd.cluster.token }} \
+               --quota-backend-bytes {{ etcd.quota_backend_bytes }} \
+               --snapshot-count {{ etcd.snapshot_count }} \
+               --auto-compaction-retention {{ etcd.auto_compaction_retention 
}} \
+               --auto-compaction-mode {{ etcd.auto_compaction_mode }} \
+               --log-level {{ etcd.loglevel }}"
+    ports:
+      - "{{ etcd.client.port + groups['etcd'].index(inventory_hostname) }}:{{ 
etcd.client.port + groups['etcd'].index(inventory_hostname) }}"
+      - "{{ etcd.server.port + groups['etcd'].index(inventory_hostname) }}:{{ 
etcd.server.port + groups['etcd'].index(inventory_hostname) }}"
+    pull: "{{ etcd.pull_etcd | default(true) }}"
+
+- name: wait until etcd in this host is up and running
+  uri:
+    url: "http://{{ ansible_host }}:{{ etcd.client.port + 
groups['etcd'].index(inventory_hostname) }}/health"
+  register: result
+  until: result.status == 200
+  retries: 12
+  delay: 5
diff --git a/ansible/roles/etcd/tasks/main.yml 
b/ansible/roles/etcd/tasks/main.yml
new file mode 100644
index 0000000..48505a8
--- /dev/null
+++ b/ansible/roles/etcd/tasks/main.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This role will install etcd in group 'etcd' in the environment inventory
+# In deploy mode it will deploy etcd containers.
+# In clean mode it will remove etcd containers.
+
+- import_tasks: deploy.yml
+  when: mode == "deploy"
+
+- import_tasks: clean.yml
+  when: mode == "clean"
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 82f2a8c..ffd7480 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -73,6 +73,9 @@ dependencies {
     //for mesos
     compile "com.adobe.api.platform.runtime:mesos-actor:0.0.17"
 
+    // for etcd
+    compile("com.ibm.etcd:etcd-java:0.0.13")
+
     //tracing support
     compile "io.opentracing:opentracing-api:0.31.0"
     compile "io.opentracing:opentracing-util:0.31.0"
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index ea96e92..a894360 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -383,6 +383,11 @@ whisk {
         local-image-prefix = "whisk"
     }
 
+    # cluster name related etcd configuration
+    cluster {
+        name = "whisk"
+    }
+
     user-events {
         enabled = false
     }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 84e75e8..ba15f5f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -265,6 +265,9 @@ object ConfigKeys {
   val controller = s"whisk.controller"
   val controllerActivation = s"$controller.activation"
 
+  val etcd = "whisk.etcd"
+  val etcdPoolThreads = "whisk.etcd.pool.threads"
+
   val activationStore = "whisk.activation-store"
   val elasticSearchActivationStore = s"$activationStore.elasticsearch"
   val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
@@ -286,4 +289,6 @@ object ConfigKeys {
   val parameterStorage = "whisk.parameter-storage"
 
   val azBlob = "whisk.azure-blob"
+
+  val whiskClusterName = "whisk.cluster.name"
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala
new file mode 100644
index 0000000..a55fc85
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.entity
+
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.http.Messages
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.{JsObject, _}
+
+import scala.util.{Failure, Success, Try}
+
+protected[openwhisk] case class CreationId private (val asString: String) 
extends AnyVal {
+  override def toString: String = asString
+  def toJsObject: JsObject = JsObject("creationId" -> asString.toJson)
+}
+
+protected[core] object CreationId {
+
+  protected[core] trait CreationIdGenerator {
+    def make(): CreationId = CreationId.generate()
+  }
+
+  /** Checks if the current character is hexadecimal */
+  private def isHexadecimal(c: Char) = c.isDigit || c == 'a' || c == 'b' || c 
== 'c' || c == 'd' || c == 'e' || c == 'f'
+
+  /**
+   * Parses an creation id from a string.
+   *
+   * @param id the creation id as string
+   * @return CreationId instance
+   */
+  def parse(id: String): Try[CreationId] = {
+    val length = id.length
+    if (length != 32) {
+      Failure(new 
IllegalArgumentException(Messages.creationIdLengthError(SizeError("Creation 
id", length.B, 32.B))))
+    } else if (!id.forall(isHexadecimal)) {
+      Failure(new IllegalArgumentException(Messages.creationIdIllegal))
+    } else {
+      Success(new CreationId(id))
+    }
+  }
+
+  /**
+   * Generates a random creation id using java.util.UUID factory.
+   *
+   * Uses fast path to generate the CreationId without additional requirement 
checks.
+   *
+   * @return new CreationId
+   */
+  protected[core] def generate(): CreationId = new 
CreationId(UUIDs.randomUUID().toString.filterNot(_ == '-'))
+
+  protected[core] implicit val serdes: RootJsonFormat[CreationId] = new 
RootJsonFormat[CreationId] {
+    def write(d: CreationId) = JsString(d.toString)
+
+    def read(value: JsValue): CreationId = {
+      val parsed = value match {
+        case JsString(s) => CreationId.parse(s)
+        case JsNumber(n) => CreationId.parse(n.toString)
+        case _           => 
Failure(DeserializationException(Messages.creationIdIllegal))
+      }
+
+      parsed match {
+        case Success(cid)                         => cid
+        case Failure(t: IllegalArgumentException) => 
deserializationError(t.getMessage)
+        case Failure(_)                           => 
deserializationError(Messages.creationIdIllegal)
+      }
+    }
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
new file mode 100644
index 0000000..1142747
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.etcd
+
+import com.google.common.util.concurrent.{FutureCallback, Futures, 
ListenableFuture}
+import com.ibm.etcd.api._
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.{KvClient, WatchUpdate}
+import com.ibm.etcd.client.{EtcdClient => Client}
+import io.grpc.stub.StreamObserver
+import java.util.concurrent.Executors
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.EtcdType._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol
+
+import scala.language.implicitConversions
+import scala.annotation.tailrec
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
+
+case class Lease(id: Long, ttl: Long)
+
+object RichListenableFuture {
+  implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece: 
ExecutionContextExecutor): Future[T] = {
+    val p = Promise[T]()
+    Futures.addCallback(lf, new FutureCallback[T] {
+      def onFailure(t: Throwable): Unit = p failure t
+      def onSuccess(result: T): Unit = p success result
+    }, ece)
+    p.future
+  }
+}
+
+object EtcdClient {
+  // hostAndPorts format: {HOST}:{PORT}[,{HOST}:{PORT},{HOST}:{PORT}, ...]
+  def apply(hostAndPorts: String)(implicit ece: ExecutionContextExecutor): 
EtcdClient = {
+    require(hostAndPorts != null)
+    val client: Client = 
Client.forEndpoints(hostAndPorts).withPlainText().build()
+    new EtcdClient(client)(ece)
+  }
+
+  def apply(client: Client)(implicit ece: ExecutionContextExecutor): 
EtcdClient = {
+    new EtcdClient(client)(ece)
+  }
+}
+
+class EtcdClient(val client: Client)(override implicit val ece: 
ExecutionContextExecutor)
+    extends EtcdKeyValueApi
+    with EtcdLeaseApi
+    with EtcdWatchApi
+    with EtcdLeadershipApi {
+
+  def close() = {
+    client.close()
+  }
+}
+
+trait EtcdKeyValueApi extends KeyValueStore {
+  import RichListenableFuture._
+  protected[etcd] val client: Client
+
+  override def get(key: String): Future[RangeResponse] =
+    client.getKvClient.get(key).async()
+
+  override def getPrefix(prefixKey: String): Future[RangeResponse] = {
+    client.getKvClient.get(prefixKey).asPrefix().async()
+  }
+
+  override def getCount(prefixKey: String): Future[Long] = {
+    
client.getKvClient.get(prefixKey).asPrefix().countOnly().async().map(_.getCount)
+  }
+
+  override def put(key: String, value: String): Future[PutResponse] =
+    client.getKvClient.put(key, value).async().recoverWith {
+      case t =>
+        Future.failed[PutResponse](getNestedException(t))
+    }
+
+  override def put(key: String, value: String, leaseId: Long): 
Future[PutResponse] =
+    client.getKvClient
+      .put(key, value, leaseId)
+      .async()
+      .recoverWith {
+        case t =>
+          Future.failed[PutResponse](getNestedException(t))
+      }
+
+  def put(key: String, value: Boolean): Future[PutResponse] = {
+    put(key, value.toString)
+  }
+
+  def put(key: String, value: Boolean, leaseId: Long): Future[PutResponse] = {
+    put(key, value.toString, leaseId)
+  }
+
+  override def del(key: String): Future[DeleteRangeResponse] =
+    client.getKvClient.delete(key).async().recoverWith {
+      case t =>
+        Future.failed[DeleteRangeResponse](getNestedException(t))
+    }
+
+  override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: 
Long): Future[TxnResponse] = {
+    client.getKvClient
+      .txnIf()
+      .cmpEqual(key)
+      .version(cmpVersion)
+      .`then`()
+      .put(client.getKvClient
+        .put(key, value.toString, leaseId)
+        .asRequest())
+      .async()
+      .recoverWith {
+        case t =>
+          Future.failed[TxnResponse](getNestedException(t))
+      }
+  }
+
+  @tailrec
+  private def getNestedException(t: Throwable): Throwable = {
+    if (t.getCause == null) t
+    else getNestedException(t.getCause)
+  }
+}
+
+trait KeyValueStore {
+
+  implicit val ece: ExecutionContextExecutor
+
+  def get(key: String): Future[RangeResponse]
+
+  def getPrefix(prefixKey: String): Future[RangeResponse]
+
+  def getCount(prefixKey: String): Future[Long]
+
+  def put(key: String, value: String): Future[PutResponse]
+
+  def put(key: String, value: String, leaseId: Long): Future[PutResponse]
+
+  def del(key: String): Future[DeleteRangeResponse]
+
+  def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: Long): 
Future[TxnResponse]
+}
+
+trait EtcdLeaseApi {
+  import RichListenableFuture._
+  implicit val ece: ExecutionContextExecutor
+
+  protected[etcd] val client: Client
+  protected val DEFAULT_TTL = 2
+
+  def grant(ttl: Long = DEFAULT_TTL): Future[LeaseGrantResponse] = {
+    client.getLeaseClient.grant(ttl).async()
+  }
+
+  def revoke(leaseId: Long): Future[LeaseRevokeResponse] = {
+    client.getLeaseClient.revoke(leaseId)
+  }
+
+  def keepAliveOnce(leaseId: Long): Future[LeaseKeepAliveResponse] = {
+    client.getLeaseClient.keepAliveOnce(leaseId)
+  }
+}
+
+trait EtcdWatchApi {
+  val nThreads = loadConfigOrThrow[Int](ConfigKeys.etcdPoolThreads)
+  val threadpool = Executors.newFixedThreadPool(nThreads);
+  protected[etcd] val client: Client
+
+  def watchAllKeys(next: WatchUpdate => Unit = (_: WatchUpdate) => {},
+                   error: Throwable => Unit = (_: Throwable) => {},
+                   completed: () => Unit = () => {}): Watch = {
+    client.getKvClient
+      .watch(KvClient.ALL_KEYS)
+      .prevKv()
+      .executor(threadpool)
+      .start(new StreamObserver[WatchUpdate]() {
+        override def onNext(value: WatchUpdate): Unit = {
+          next(value)
+        }
+
+        override def onError(t: Throwable): Unit = {
+          error(t)
+        }
+
+        override def onCompleted(): Unit = {
+          completed()
+        }
+      })
+  }
+
+  def watch(key: String, isPrefix: Boolean = false)(next: WatchUpdate => Unit 
= (_: WatchUpdate) => {},
+                                                    error: Throwable => Unit = 
(_: Throwable) => {},
+                                                    completed: () => Unit = () 
=> {}): Watch = {
+    val watchRequest = if (isPrefix) {
+      client.getKvClient.watch(key).asPrefix().prevKv()
+    } else {
+      client.getKvClient.watch(key).prevKv()
+    }
+    watchRequest
+      .executor(threadpool)
+      .start(new StreamObserver[WatchUpdate]() {
+        override def onNext(value: WatchUpdate): Unit = {
+          next(value)
+        }
+
+        override def onError(t: Throwable): Unit = {
+          error(t)
+        }
+
+        override def onCompleted(): Unit = {
+          completed()
+        }
+      })
+  }
+
+}
+
+trait EtcdLeadershipApi extends EtcdKeyValueApi with EtcdLeaseApi with 
EtcdWatchApi {
+
+  protected[etcd] val client: Client
+
+  val initVersion = 0
+
+  def electLeader(key: String, value: String, timeout: Long = 60): 
Future[Either[EtcdFollower, EtcdLeader]] =
+    for {
+      leaseResp <- grant(timeout)
+      txnResp <- putTxn(key, value, initVersion, leaseResp.getID)
+      result <- Future {
+        if (txnResp.getSucceeded) {
+          Right(EtcdLeader(key, value, leaseResp.getID))
+        } else {
+          Left(EtcdFollower(key, value))
+        }
+      }
+    } yield result
+
+  def electLeader(key: String, value: String, lease: Lease): 
Future[Either[EtcdFollower, EtcdLeader]] =
+    for {
+      txnResp <- putTxn(key, value, initVersion, lease.id)
+      result <- Future {
+        if (txnResp.getSucceeded) {
+          Right(EtcdLeader(key, value, lease.id))
+        } else {
+          Left(EtcdFollower(key, value))
+        }
+      }
+    } yield result
+
+  def keepAliveLeader(leaseId: Long): Future[Long] =
+    keepAliveOnce(leaseId).map(res => res.getID)
+
+}
+case class EtcdLeader(key: String, value: String, leaseId: Long)
+
+object EtcdLeader extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat3(EtcdLeader.apply)
+}
+
+case class EtcdFollower(key: String, value: String)
+
+object EtcdFollower extends DefaultJsonProtocol {
+  implicit val serdes = jsonFormat2(EtcdFollower.apply)
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdUtils.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdUtils.scala
new file mode 100644
index 0000000..3da9d6b
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdUtils.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.etcd
+
+import java.nio.charset.StandardCharsets
+
+import com.google.protobuf.ByteString
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.ContainerId
+import org.apache.openwhisk.core.entity.SizeUnits.MB
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import pureconfig.loadConfigOrThrow
+
+import scala.language.implicitConversions
+import scala.util.Try
+
+case class EtcdConfig(hosts: String)
+
+case class EtcdException(msg: String) extends Exception(msg)
+
+/**
+ * If you import the line below, it implicitly converts ByteString type to 
Scala Type.
+ *
+ * import org.apache.openwhisk.core.etcd.EtcdType._
+ */
+object EtcdType {
+
+  implicit def stringToByteString(str: String): ByteString = 
ByteString.copyFromUtf8(str)
+
+  implicit def ByteStringToString(byteString: ByteString): String = 
byteString.toString(StandardCharsets.UTF_8)
+
+  implicit def ByteStringToInt(byteString: ByteString): Int = 
byteString.toString(StandardCharsets.UTF_8).toInt
+
+  implicit def IntToByteString(int: Int): ByteString = 
ByteString.copyFromUtf8(int.toString)
+
+  implicit def ByteStringToLong(byteString: ByteString): Long = 
byteString.toString(StandardCharsets.UTF_8).toLong
+
+  implicit def LongToByteString(long: Long): ByteString = 
ByteString.copyFromUtf8(long.toString)
+
+  implicit def ByteStringToBoolean(byteString: ByteString): Boolean =
+    byteString.toString(StandardCharsets.UTF_8).toBoolean
+
+  implicit def BooleanToByteString(bool: Boolean): ByteString = 
ByteString.copyFromUtf8(bool.toString)
+
+  implicit def ByteStringToByteSize(byteString: ByteString): ByteSize =
+    ByteSize(byteString.toString(StandardCharsets.UTF_8).toLong, MB)
+
+  implicit def ByteSizeToByteString(byteSize: ByteSize): ByteString = 
ByteString.copyFromUtf8(byteSize.toMB.toString)
+
+}
+
+object EtcdKV {
+
+  val TOP = "\ufff0"
+
+  val clusterName = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+  object SchedulerKeys {
+    val prefix = s"$clusterName/scheduler"
+
+    val scheduler = s"$prefix"
+
+    /**
+     *  The keys for states of schedulers
+     */
+    def scheduler(instanceId: SchedulerInstanceId) = 
s"$prefix/${instanceId.asString}"
+
+  }
+
+  object QueueKeys {
+
+    val inProgressPrefix = s"$clusterName/in-progress"
+    val queuePrefix = s"$clusterName/queue"
+
+    /**
+     * The keys for in-progress queue
+     */
+    def inProgressQueue(invocationNamespace: String, fqn: 
FullyQualifiedEntityName) =
+      s"$inProgressPrefix/queue/$invocationNamespace/${fqn.copy(version = 
None)}"
+
+    /**
+     * The prefix key for state in the queue
+     */
+    def queuePrefix(invocationNamespace: String, fqn: 
FullyQualifiedEntityName): String =
+      s"$queuePrefix/$invocationNamespace/${fqn.copy(version = None)}"
+
+    /**
+     * The keys for state in the queue
+     *
+     * Example
+     *  - queue/invocationNs/ns/pkg/act/leader
+     *  - queue/invocationNs/ns/pkg/act/follower/scheduler1
+     *  - queue/invocationNs/ns/pkg/act/follower/scheduler2
+     *
+     */
+    def queue(invocationNamespace: String,
+              fqn: FullyQualifiedEntityName,
+              leader: Boolean,
+              schedulerInstanceId: Option[SchedulerInstanceId] = None): String 
= {
+      require(leader || (!leader && schedulerInstanceId.isDefined))
+      val prefix = s"$queuePrefix/$invocationNamespace/${fqn.copy(version = 
None)}"
+      if (leader)
+        s"$prefix/leader"
+      else
+        s"$prefix/follower/${schedulerInstanceId.get.asString}"
+    }
+  }
+
+  object ThrottlingKeys {
+    val prefix = s"$clusterName/throttling"
+
+    /**
+     *  The keys for namespace throttling
+     */
+    def namespace(namespace: EntityName) = s"$prefix/$namespace"
+
+    /**
+     *  The keys for action throttling
+     */
+    def action(invocationNamespace: String, fqn: FullyQualifiedEntityName) =
+      s"$prefix/$invocationNamespace/${fqn.copy(version = None)}"
+
+    /**
+     *  The keys for action throttling
+     */
+    def action(invocationNamespace: String, fqn: String) = 
s"$prefix/$invocationNamespace/$fqn"
+
+  }
+
+  object ContainerKeys {
+    val namespacePrefix = s"$clusterName/namespace"
+    val inProgressPrefix = s"$clusterName/in-progress"
+    val warmedPrefix = s"$clusterName/warmed"
+
+    /**
+     * The keys for the number of container
+     */
+    def containerPrefix(containerType: String,
+                        invocationNamespace: String,
+                        fqn: FullyQualifiedEntityName,
+                        revision: Option[DocRevision] = None): String =
+      s"$containerType/$invocationNamespace/${fqn.copy(version = 
None)}/${revision.map(r => s"$r/").getOrElse("")}"
+
+    /**
+     * The keys for in-progress container
+     *
+     * For count queries, fqn must be at the front.
+     */
+    def inProgressContainer(invocationNamespace: String,
+                            fqn: FullyQualifiedEntityName,
+                            revision: DocRevision,
+                            sid: SchedulerInstanceId,
+                            cid: CreationId): String =
+      s"${containerPrefix(inProgressPrefix, invocationNamespace, fqn, 
Some(revision))}scheduler/${sid.asString}/creationId/$cid"
+
+    /**
+     * The keys for the number of warmed container
+     */
+    def warmedContainers(invocationNamespace: String,
+                         fqn: FullyQualifiedEntityName,
+                         revision: DocRevision,
+                         invokerInstanceId: InvokerInstanceId,
+                         containerId: ContainerId): String =
+      s"${containerPrefix(warmedPrefix, invocationNamespace, fqn, 
Some(revision))}invoker/${invokerInstanceId.instance}/container/${containerId.asString}"
+
+    /**
+     * The keys for the number of existing container
+     */
+    def existingContainers(invocationNamespace: String,
+                           fqn: FullyQualifiedEntityName,
+                           revision: DocRevision,
+                           invoker: Option[InvokerInstanceId] = None,
+                           containerId: Option[ContainerId] = None): String =
+      containerPrefix(namespacePrefix, invocationNamespace, fqn, 
Some(revision)) + invoker
+        .map(id => s"invoker${id.toInt}/")
+        .getOrElse("") + containerId
+        .map(id => s"container/${id.asString}")
+        .getOrElse("")
+
+    /**
+     * The keys for the number of in-progress container by namespace
+     */
+    def inProgressContainerPrefixByNamespace(invocationNamespace: String): 
String =
+      s"$inProgressPrefix/$invocationNamespace/"
+
+    /**
+     * The keys for the number of existing container by namespace
+     */
+    def existingContainersPrefixByNamespace(invocationNamespace: String): 
String =
+      s"$namespacePrefix/$invocationNamespace/"
+
+  }
+
+  object InvokerKeys {
+    val prefix = s"$clusterName/invokers"
+
+    /**
+     * If displayName only exists in the etcd key, we cannot differentiate it 
with the uniqueName
+     */
+    def health(invokerInstanceId: InvokerInstanceId) = {
+      (invokerInstanceId.uniqueName, invokerInstanceId.displayedName) match {
+        case (Some(uniqueName), Some(displayName)) => 
s"$prefix/${invokerInstanceId.toInt}/$uniqueName/$displayName"
+        case (Some(uniqueName), None)              => 
s"$prefix/${invokerInstanceId.toInt}/$uniqueName"
+        case _                                     => 
s"$prefix/${invokerInstanceId.toInt}"
+      }
+    }
+
+    // id is not supposed to be -1
+    private def getId(id: String): Int = {
+      Try { id.toInt } getOrElse (-1)
+    }
+
+    def getInstanceId(invokerKey: String): InvokerInstanceId = {
+      val constructs = invokerKey.split("\\b/+")
+      constructs match {
+        case Array(_, _, id, uniqueName, displayName) =>
+          InvokerInstanceId(getId(id), Some(uniqueName), Some(displayName), 
0.B)
+        case Array(_, _, id, uniqueName) =>
+          InvokerInstanceId(getId(id), Some(uniqueName), userMemory = 0.B)
+        case Array(_, _, id) =>
+          InvokerInstanceId(getId(id), userMemory = 0.B)
+      }
+    }
+  }
+
+  object InstanceKeys {
+
+    val instancePrefix = s"$clusterName/instance"
+
+    def instanceLease(instanceId: InstanceId): String =
+      s"$instancePrefix/$instanceId/lease"
+  }
+
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
index 5424dd3..695ec17 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
@@ -99,6 +99,12 @@ object Messages {
     s"${error.field} length is ${error.is.toBytes} but must be 
${error.allowed.toBytes}."
   }
 
+  /** Standard error for malformed creation id. */
+  val creationIdIllegal = "The creation id is not valid."
+  def creationIdLengthError(error: SizeError) = {
+    s"${error.field} length is ${error.is.toBytes} but must be 
${error.allowed.toBytes}."
+  }
+
   /** Error messages for sequence actions. */
   val sequenceIsTooLong = "Too many actions in the sequence."
   val sequenceNoComponent = "No component specified for the sequence."
diff --git a/tests/build.gradle b/tests/build.gradle
index 49fac12..f75800e 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -73,10 +73,11 @@ ext.testSets = [
             "org/apache/openwhisk/core/cli/test/**",
             "org/apache/openwhisk/core/limits/**",
             
"org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
+            "org/apache/openwhisk/common/etcd/**",
             "**/*CacheConcurrencyTests*",
             "**/*ControllerApiTests*",
             "org/apache/openwhisk/testEntities/**",
-            'invokerShoot/**'
+            "invokerShoot/**"
         ]
     ],
     "REQUIRE_SYSTEM" : [
@@ -197,6 +198,14 @@ task testUnit(type: Test) {
     exclude couchDbExcludes
 }
 
+task testUnitEtcd(type: Test) {
+    def etcdUnitIncludes = [
+        "org/apache/openwhisk/common/etcd/**"
+    ]
+
+    include etcdUnitIncludes
+}
+
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile "org.apache.commons:commons-lang3:3.3.2"
diff --git a/tests/src/test/resources/application.conf.j2 
b/tests/src/test/resources/application.conf.j2
index 90802bc..cdae2bd 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -118,6 +118,16 @@ whisk {
             password      = "{{ db.elasticsearch.auth.admin.password }}"
         }
     }
+
+    etcd {
+        hosts = "{{ etcd_connect_string }}"
+        lease {
+            timeout = "{{ etcd.lease.timeout }}"
+        }
+        pool {
+            threads = "{{ etcd.pool_threads }}"
+        }
+    }
 }
 
 #test-only overrides so that tests can override defaults in application.conf 
(todo: move all defaults to reference.conf)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala
new file mode 100644
index 0000000..a93cda0
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.etcd
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.InvokerInstanceId
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+
+@RunWith(classOf[JUnitRunner])
+class EtcdKvTests extends FlatSpec with ScalaFutures with Matchers {
+
+  behavior of "InvokerKeys"
+
+  val clusterName = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+  val uniqueName = "myUniqueName"
+  val displayedName = "myDisplayedName"
+
+  it should "serialize a InvokerInstanceId to a health-key if there is only 
id" in {
+    val instanceId = InvokerInstanceId(0, userMemory = 0.MB)
+    InvokerKeys.health(instanceId) shouldBe s"$clusterName/invokers/0"
+  }
+
+  it should "serialize a InvokerInstanceId to a health-key if there are id and 
unique name" in {
+    val instanceId = InvokerInstanceId(0, Some(uniqueName), userMemory = 0.MB)
+    InvokerKeys.health(instanceId) shouldBe 
s"$clusterName/invokers/0/$uniqueName"
+  }
+
+  it should "serialize a InvokerInstanceId to a health-key if there are id, 
unique name and displayed name" in {
+    val instanceId = InvokerInstanceId(0, Some(uniqueName), 
Some(displayedName), userMemory = 0.MB)
+    InvokerKeys.health(instanceId) shouldBe 
s"$clusterName/invokers/0/$uniqueName/$displayedName"
+  }
+
+  it should "deserialize InvokerInstanceId from ETCD key if there is only id" 
in {
+    val testKey = "$clusterName/invokers/0"
+    val instanceId = InvokerKeys.getInstanceId(testKey)
+
+    instanceId shouldBe InvokerInstanceId(0, userMemory = 0.MB)
+  }
+
+  it should "deserialize InvokerInstanceId from ETCD key with id and a unique 
name" in {
+    val testKey = s"$clusterName/invokers/0/$uniqueName"
+    val instanceId = InvokerKeys.getInstanceId(testKey)
+
+    instanceId shouldBe InvokerInstanceId(0, Some(uniqueName), userMemory = 
0.MB)
+  }
+
+  it should "deserialize InvokerInstanceId from ETCD key with id, a unique 
name, and a displayed name" in {
+    val testKey = s"$clusterName/invokers/0/$uniqueName/$displayedName"
+    val instanceId = InvokerKeys.getInstanceId(testKey)
+
+    instanceId shouldBe InvokerInstanceId(0, Some(uniqueName), 
Some(displayedName), userMemory = 0.MB)
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
new file mode 100644
index 0000000..816693a
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.etcd
+
+import java.util.concurrent.Executor
+import java.{lang, util}
+
+import com.ibm.etcd.api.Event.EventType
+import com.ibm.etcd.api._
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.WatchUpdate
+import com.ibm.etcd.client.{EtcdClient => Client}
+import common.{StreamLogging, WskActorSystem}
+import io.grpc.{StatusRuntimeException, Status => GrpcStatus}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader, 
EtcdLeadershipApi, Lease}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+
+@RunWith(classOf[JUnitRunner])
+class EtcdLeaderShipUnitTests extends FlatSpec with ScalaFutures with Matchers 
with WskActorSystem with StreamLogging {
+
+  implicit val timeout = Timeout(2.seconds)
+  private val leaderKey = "openwhiskleader"
+  private val endpoints = "endpoints"
+  private val leaseId = 60
+
+  class mockWatchUpdate extends WatchUpdate {
+    private var eventLists: util.List[Event] = new util.ArrayList[Event]()
+    override def getHeader: ResponseHeader = ???
+
+    def addEvents(event: Event): WatchUpdate = {
+      eventLists.add(event)
+      this
+    }
+
+    override def getEvents: util.List[Event] = eventLists
+  }
+
+  class MockEtcdLeadershipApi extends EtcdLeadershipApi {
+
+    override implicit val ece: ExecutionContextExecutor = 
actorSystem.dispatcher
+
+    override val client: Client = {
+      val hostAndPorts = "172.17.0.1:2379"
+      Client.forEndpoints(hostAndPorts).withPlainText().build()
+    }
+
+    var onNext: WatchUpdate => Unit = null
+
+    override def grant(ttl: Long): Future[LeaseGrantResponse] =
+      
Future.successful(LeaseGrantResponse.newBuilder().setID(leaseId).setTTL(ttl).build())
+
+    override def keepAliveOnce(leaseId: Long): Future[LeaseKeepAliveResponse] =
+      
Future.successful(LeaseKeepAliveResponse.newBuilder().setID(leaseId).build())
+
+    override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: 
Long): Future[TxnResponse] =
+      Future.successful(TxnResponse.newBuilder().setSucceeded(true).build())
+
+    override def watch(key: String, isPrefix: Boolean)(next: WatchUpdate => 
Unit,
+                                                       error: Throwable => 
Unit,
+                                                       completed: () => Unit): 
Watch = {
+      onNext = next
+      new Watch {
+        override def close(): Unit = {}
+
+        override def addListener(listener: Runnable, executor: Executor): Unit 
= {}
+
+        override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+
+        override def isCancelled: Boolean = true
+
+        override def isDone: Boolean = true
+
+        override def get(): lang.Boolean = true
+
+        override def get(timeout: Long, unit: TimeUnit): lang.Boolean = true
+      }
+    }
+
+    def publishEvents(eventType: EventType, key: String, value: String): Unit 
= {
+      val eType = eventType match {
+        case EventType.PUT          => EventType.PUT
+        case EventType.DELETE       => EventType.DELETE
+        case EventType.UNRECOGNIZED => EventType.UNRECOGNIZED
+      }
+
+      val event = Event
+        .newBuilder()
+        .setType(eType)
+        .setKv(
+          KeyValue
+            .newBuilder()
+            .setKey(key)
+            .setValue(value)
+            .build())
+        .build()
+      onNext(new mockWatchUpdate().addEvents(event))
+    }
+  }
+
+  behavior of "Etcd Leadership Client"
+
+  "Etcd LeaderShip client" should "elect leader successfully" in {
+    val mockLeaderShipClient = new MockEtcdLeadershipApi
+
+    val either = mockLeaderShipClient.electLeader(leaderKey, 
endpoints).futureValue(timeout)
+    either.right.get shouldBe EtcdLeader(leaderKey, endpoints, leaseId)
+  }
+
+  "Etcd LeaderShip client" should "be failed to elect leader" in {
+    val mockLeaderShipClient = new MockEtcdLeadershipApi() {
+      override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: 
Long): Future[TxnResponse] =
+        Future.successful(TxnResponse.newBuilder().setSucceeded(false).build())
+    }
+
+    val either = mockLeaderShipClient.electLeader(leaderKey, 
endpoints).futureValue(timeout)
+    either.left.get shouldBe EtcdFollower(leaderKey, endpoints)
+
+  }
+
+  "Etcd LeaderShip client" should "elect leader successfully with provided 
lease" in {
+    val mockLeaderShipClient = new MockEtcdLeadershipApi
+
+    val either = mockLeaderShipClient.electLeader(leaderKey, endpoints, 
Lease(leaseId, 60)).futureValue(timeout)
+    either.right.get shouldBe EtcdLeader(leaderKey, endpoints, leaseId)
+  }
+
+  "Etcd LeaderShip client" should "be failed to elect leader with provided 
lease" in {
+    val mockLeaderShipClient = new MockEtcdLeadershipApi() {
+      override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: 
Long): Future[TxnResponse] =
+        Future.successful(TxnResponse.newBuilder().setSucceeded(false).build())
+    }
+
+    val either = mockLeaderShipClient.electLeader(leaderKey, endpoints, 
Lease(leaseId, 60)).futureValue(timeout)
+    either.left.get shouldBe EtcdFollower(leaderKey, endpoints)
+  }
+
+  "Etcd LeaderShip client" should "throw StatusRuntimeException when provided 
lease doesn't exist" in {
+    val mockLeaderShipClient = new MockEtcdLeadershipApi() {
+      override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: 
Long): Future[TxnResponse] =
+        Future.failed(new StatusRuntimeException(GrpcStatus.NOT_FOUND))
+    }
+
+    mockLeaderShipClient
+      .electLeader(leaderKey, endpoints, Lease(leaseId, 60))
+      .failed
+      .futureValue shouldBe a[StatusRuntimeException]
+  }
+
+  "Etcd LeaderShip client" should "keep alive leader key" in {
+    val mockLeaderShipClient = new MockEtcdLeadershipApi
+
+    mockLeaderShipClient.keepAliveLeader(leaseId).futureValue(timeout) 
shouldBe leaseId
+  }
+
+}

Reply via email to