This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5eda221 [New Scheduler] Etcd installation & Implements EtcdClient
(#5031)
5eda221 is described below
commit 5eda22171a238e933121b3918c5940e37fb009c5
Author: 김건희 <[email protected]>
AuthorDate: Tue Feb 9 10:23:34 2021 +0900
[New Scheduler] Etcd installation & Implements EtcdClient (#5031)
* Implements EtcdClient
* Add license header
* Add configuration for tests
* Exclude etcd config from controller & invoker
* Add config for etcd test
* Separate etcd test from whisk unit test
* Apply scala compilation
---
ansible/environments/docker-machine/hosts.j2.ini | 5 +
ansible/environments/local/hosts.j2.ini | 3 +
ansible/environments/vagrant/hosts | 3 +
ansible/etcd.yml | 22 ++
ansible/group_vars/all | 26 ++
ansible/roles/etcd/tasks/clean.yml | 25 ++
ansible/roles/etcd/tasks/deploy.yml | 70 ++++++
ansible/roles/etcd/tasks/main.yml | 26 ++
common/scala/build.gradle | 3 +
common/scala/src/main/resources/application.conf | 5 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 5 +
.../apache/openwhisk/core/entity/CreationId.scala | 84 +++++++
.../apache/openwhisk/core/etcd/EtcdClient.scala | 279 +++++++++++++++++++++
.../org/apache/openwhisk/core/etcd/EtcdUtils.scala | 250 ++++++++++++++++++
.../org/apache/openwhisk/http/ErrorResponse.scala | 6 +
tests/build.gradle | 11 +-
tests/src/test/resources/application.conf.j2 | 10 +
.../apache/openwhisk/common/etcd/EtcdKvTests.scala | 74 ++++++
.../common/etcd/EtcdLeaderShipUnitTests.scala | 178 +++++++++++++
19 files changed, 1084 insertions(+), 1 deletion(-)
diff --git a/ansible/environments/docker-machine/hosts.j2.ini
b/ansible/environments/docker-machine/hosts.j2.ini
index 45f327b..e1549e6 100644
--- a/ansible/environments/docker-machine/hosts.j2.ini
+++ b/ansible/environments/docker-machine/hosts.j2.ini
@@ -39,6 +39,11 @@ invoker1 ansible_host={{
docker_machine_ip }}
[elasticsearch:children]
db
+[etcd]
+etcd0 ansible_host={{ docker_machine_ip }}
+{% if mode is defined and 'HA' in mode %}
+etcd1 ansible_host={{ docker_machine_ip }}
+
; define variables
[all:vars]
ansible_connection=ssh
diff --git a/ansible/environments/local/hosts.j2.ini
b/ansible/environments/local/hosts.j2.ini
index 7512d966..cceb706 100644
--- a/ansible/environments/local/hosts.j2.ini
+++ b/ansible/environments/local/hosts.j2.ini
@@ -39,3 +39,6 @@ db
[apigateway]
172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
+
+[etcd]
+etcd0 ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/environments/vagrant/hosts
b/ansible/environments/vagrant/hosts
index e32f943..fd30391 100644
--- a/ansible/environments/vagrant/hosts
+++ b/ansible/environments/vagrant/hosts
@@ -27,3 +27,6 @@ invoker0 ansible_host=172.17.0.1
ansible_connection=local
[apigateway]
172.17.0.1 ansible_host=172.17.0.1 ansible_connection=local
+
+[etcd]
+etcd0 ansible_host=172.17.0.1 ansible_connection=local
diff --git a/ansible/etcd.yml b/ansible/etcd.yml
new file mode 100644
index 0000000..b167585
--- /dev/null
+++ b/ansible/etcd.yml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This playbook deploys Openwhisk Invokers.
+
+- hosts: etcd
+ roles:
+ - etcd
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 564a021..555ad88 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -51,6 +51,7 @@ whisk:
feature_flags:
require_api_key_annotation: "{{ require_api_key_annotation | default(true)
| lower }}"
require_response_payload: "{{ require_response_payload | default(true) |
lower }}"
+ cluster_name: "{{ whisk_cluster_name | default('whisk') }}"
##
# configuration parameters related to support runtimes (see
org.apache.openwhisk.core.entity.ExecManifest for schema of the manifest).
@@ -424,3 +425,28 @@ user_events: "{{ user_events_enabled | default(false) |
lower }}"
durationChecker:
timeWindow: "{{ duration_checker_time_window | default('1 d') }}"
+
+etcd:
+ version: "{{ etcd_version | default('v3.4.0') }}"
+ client:
+ port: 2379
+ server:
+ port: 2480
+ cluster:
+ token: "{{ etcd_cluster_token | default('openwhisk-etcd-token') }}"
+ dir:
+ data: "{{ etcd_data_dir | default('') }}"
+ lease:
+ timeout: "{{ etcd_lease_timeout | default(1) }}"
+ loglevel: "{{ etcd_log_level | default('info') }}"
+ quota_backend_bytes: "{{ etcd_quota_backend_bytes | default(0) }}"
+ snapshot_count: "{{ etcd_snapshot_count | default(100000) }}"
+ auto_compaction_retention: "{{ etcd_auto_compaction_retention | default(1)
}}"
+ auto_compaction_mode: "{{ etcd_auto_compaction_mode | default('periodic') }}"
+ pool_threads: "{{ etcd_pool_threads | default(10) }}"
+
+etcd_connect_string: "{% set ret = [] %}\
+ {% for host in groups['etcd'] %}\
+ {{ ret.append( hostvars[host].ansible_host + ':' +
((etcd.client.port+loop.index-1)|string) ) }}\
+ {% endfor %}\
+ {{ ret | join(',') }}"
diff --git a/ansible/roles/etcd/tasks/clean.yml
b/ansible/roles/etcd/tasks/clean.yml
new file mode 100644
index 0000000..418072b
--- /dev/null
+++ b/ansible/roles/etcd/tasks/clean.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# Remove etcd containers.
+
+- name: remove etcd
+ docker_container:
+ name: etcd{{ groups['etcd'].index(inventory_hostname) }}
+ keep_volumes: True
+ state: absent
+ ignore_errors: True
diff --git a/ansible/roles/etcd/tasks/deploy.yml
b/ansible/roles/etcd/tasks/deploy.yml
new file mode 100644
index 0000000..63b908c
--- /dev/null
+++ b/ansible/roles/etcd/tasks/deploy.yml
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This role will install etcd in group 'etcd' in the environment inventory
+
+- name: "Set the name of the etcd node"
+ set_fact:
+ name: "etcd{{ groups['etcd'].index(inventory_hostname) }}"
+
+- name: "set the volume_dir"
+ set_fact:
+ volume_dir: "{{ etcd.dir.data }}/etcd{{
groups['etcd'].index(inventory_hostname) }}:/etcd-data"
+ when: etcd_data_dir is defined
+
+
+- name: "Set the cluster of the etcd cluster"
+ set_fact:
+ cluster: "{% set etcdhosts = [] %}
+ {% for host in groups['etcd'] %}
+ {{ etcdhosts.append('etcd' + ((loop.index-1)|string) + '=' +
'http://' + hostvars[host].ansible_host + ':' + ((2480+loop.index-1)|string) )
}}
+ {% endfor %}
+ {{ etcdhosts | join(',') }}"
+
+- name: (re)start etcd
+ docker_container:
+ name: etcd{{ groups['etcd'].index(inventory_hostname) }}
+ image: quay.io/coreos/etcd:{{ etcd.version }}
+ state: started
+ recreate: true
+ restart_policy: "{{ docker.restart.policy }}"
+ volumes: "{{volume_dir | default([])}}"
+ command: "/usr/local/bin/etcd \
+ --data-dir=/etcd-data --name '{{ name }}' \
+ --initial-advertise-peer-urls http://{{ ansible_host }}:{{
etcd.server.port + groups['etcd'].index(inventory_hostname) }} \
+ --advertise-client-urls http://{{ ansible_host }}:{{
etcd.client.port + groups['etcd'].index(inventory_hostname) }} \
+ --listen-peer-urls http://0.0.0.0:{{ etcd.server.port +
groups['etcd'].index(inventory_hostname) }} \
+ --listen-client-urls http://0.0.0.0:{{ etcd.client.port +
groups['etcd'].index(inventory_hostname) }} \
+ --initial-cluster {{ cluster }} \
+ --initial-cluster-state new --initial-cluster-token {{
etcd.cluster.token }} \
+ --quota-backend-bytes {{ etcd.quota_backend_bytes }} \
+ --snapshot-count {{ etcd.snapshot_count }} \
+ --auto-compaction-retention {{ etcd.auto_compaction_retention
}} \
+ --auto-compaction-mode {{ etcd.auto_compaction_mode }} \
+ --log-level {{ etcd.loglevel }}"
+ ports:
+ - "{{ etcd.client.port + groups['etcd'].index(inventory_hostname) }}:{{
etcd.client.port + groups['etcd'].index(inventory_hostname) }}"
+ - "{{ etcd.server.port + groups['etcd'].index(inventory_hostname) }}:{{
etcd.server.port + groups['etcd'].index(inventory_hostname) }}"
+ pull: "{{ etcd.pull_etcd | default(true) }}"
+
+- name: wait until etcd in this host is up and running
+ uri:
+ url: "http://{{ ansible_host }}:{{ etcd.client.port +
groups['etcd'].index(inventory_hostname) }}/health"
+ register: result
+ until: result.status == 200
+ retries: 12
+ delay: 5
diff --git a/ansible/roles/etcd/tasks/main.yml
b/ansible/roles/etcd/tasks/main.yml
new file mode 100644
index 0000000..48505a8
--- /dev/null
+++ b/ansible/roles/etcd/tasks/main.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+# This role will install etcd in group 'etcd' in the environment inventory
+# In deploy mode it will deploy etcd containers.
+# In clean mode it will remove etcd containers.
+
+- import_tasks: deploy.yml
+ when: mode == "deploy"
+
+- import_tasks: clean.yml
+ when: mode == "clean"
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 82f2a8c..ffd7480 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -73,6 +73,9 @@ dependencies {
//for mesos
compile "com.adobe.api.platform.runtime:mesos-actor:0.0.17"
+ // for etcd
+ compile("com.ibm.etcd:etcd-java:0.0.13")
+
//tracing support
compile "io.opentracing:opentracing-api:0.31.0"
compile "io.opentracing:opentracing-util:0.31.0"
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index ea96e92..a894360 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -383,6 +383,11 @@ whisk {
local-image-prefix = "whisk"
}
+ # cluster name related etcd configuration
+ cluster {
+ name = "whisk"
+ }
+
user-events {
enabled = false
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 84e75e8..ba15f5f 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -265,6 +265,9 @@ object ConfigKeys {
val controller = s"whisk.controller"
val controllerActivation = s"$controller.activation"
+ val etcd = "whisk.etcd"
+ val etcdPoolThreads = "whisk.etcd.pool.threads"
+
val activationStore = "whisk.activation-store"
val elasticSearchActivationStore = s"$activationStore.elasticsearch"
val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
@@ -286,4 +289,6 @@ object ConfigKeys {
val parameterStorage = "whisk.parameter-storage"
val azBlob = "whisk.azure-blob"
+
+ val whiskClusterName = "whisk.cluster.name"
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala
new file mode 100644
index 0000000..a55fc85
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.entity
+
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.http.Messages
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.{JsObject, _}
+
+import scala.util.{Failure, Success, Try}
+
+protected[openwhisk] case class CreationId private (val asString: String)
extends AnyVal {
+ override def toString: String = asString
+ def toJsObject: JsObject = JsObject("creationId" -> asString.toJson)
+}
+
+protected[core] object CreationId {
+
+ protected[core] trait CreationIdGenerator {
+ def make(): CreationId = CreationId.generate()
+ }
+
+ /** Checks if the current character is hexadecimal */
+ private def isHexadecimal(c: Char) = c.isDigit || c == 'a' || c == 'b' || c
== 'c' || c == 'd' || c == 'e' || c == 'f'
+
+ /**
+ * Parses an creation id from a string.
+ *
+ * @param id the creation id as string
+ * @return CreationId instance
+ */
+ def parse(id: String): Try[CreationId] = {
+ val length = id.length
+ if (length != 32) {
+ Failure(new
IllegalArgumentException(Messages.creationIdLengthError(SizeError("Creation
id", length.B, 32.B))))
+ } else if (!id.forall(isHexadecimal)) {
+ Failure(new IllegalArgumentException(Messages.creationIdIllegal))
+ } else {
+ Success(new CreationId(id))
+ }
+ }
+
+ /**
+ * Generates a random creation id using java.util.UUID factory.
+ *
+ * Uses fast path to generate the CreationId without additional requirement
checks.
+ *
+ * @return new CreationId
+ */
+ protected[core] def generate(): CreationId = new
CreationId(UUIDs.randomUUID().toString.filterNot(_ == '-'))
+
+ protected[core] implicit val serdes: RootJsonFormat[CreationId] = new
RootJsonFormat[CreationId] {
+ def write(d: CreationId) = JsString(d.toString)
+
+ def read(value: JsValue): CreationId = {
+ val parsed = value match {
+ case JsString(s) => CreationId.parse(s)
+ case JsNumber(n) => CreationId.parse(n.toString)
+ case _ =>
Failure(DeserializationException(Messages.creationIdIllegal))
+ }
+
+ parsed match {
+ case Success(cid) => cid
+ case Failure(t: IllegalArgumentException) =>
deserializationError(t.getMessage)
+ case Failure(_) =>
deserializationError(Messages.creationIdIllegal)
+ }
+ }
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
new file mode 100644
index 0000000..1142747
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.etcd
+
+import com.google.common.util.concurrent.{FutureCallback, Futures,
ListenableFuture}
+import com.ibm.etcd.api._
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.{KvClient, WatchUpdate}
+import com.ibm.etcd.client.{EtcdClient => Client}
+import io.grpc.stub.StreamObserver
+import java.util.concurrent.Executors
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.EtcdType._
+import pureconfig.loadConfigOrThrow
+import spray.json.DefaultJsonProtocol
+
+import scala.language.implicitConversions
+import scala.annotation.tailrec
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
+
+case class Lease(id: Long, ttl: Long)
+
+object RichListenableFuture {
+ implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece:
ExecutionContextExecutor): Future[T] = {
+ val p = Promise[T]()
+ Futures.addCallback(lf, new FutureCallback[T] {
+ def onFailure(t: Throwable): Unit = p failure t
+ def onSuccess(result: T): Unit = p success result
+ }, ece)
+ p.future
+ }
+}
+
+object EtcdClient {
+ // hostAndPorts format: {HOST}:{PORT}[,{HOST}:{PORT},{HOST}:{PORT}, ...]
+ def apply(hostAndPorts: String)(implicit ece: ExecutionContextExecutor):
EtcdClient = {
+ require(hostAndPorts != null)
+ val client: Client =
Client.forEndpoints(hostAndPorts).withPlainText().build()
+ new EtcdClient(client)(ece)
+ }
+
+ def apply(client: Client)(implicit ece: ExecutionContextExecutor):
EtcdClient = {
+ new EtcdClient(client)(ece)
+ }
+}
+
+class EtcdClient(val client: Client)(override implicit val ece:
ExecutionContextExecutor)
+ extends EtcdKeyValueApi
+ with EtcdLeaseApi
+ with EtcdWatchApi
+ with EtcdLeadershipApi {
+
+ def close() = {
+ client.close()
+ }
+}
+
+trait EtcdKeyValueApi extends KeyValueStore {
+ import RichListenableFuture._
+ protected[etcd] val client: Client
+
+ override def get(key: String): Future[RangeResponse] =
+ client.getKvClient.get(key).async()
+
+ override def getPrefix(prefixKey: String): Future[RangeResponse] = {
+ client.getKvClient.get(prefixKey).asPrefix().async()
+ }
+
+ override def getCount(prefixKey: String): Future[Long] = {
+
client.getKvClient.get(prefixKey).asPrefix().countOnly().async().map(_.getCount)
+ }
+
+ override def put(key: String, value: String): Future[PutResponse] =
+ client.getKvClient.put(key, value).async().recoverWith {
+ case t =>
+ Future.failed[PutResponse](getNestedException(t))
+ }
+
+ override def put(key: String, value: String, leaseId: Long):
Future[PutResponse] =
+ client.getKvClient
+ .put(key, value, leaseId)
+ .async()
+ .recoverWith {
+ case t =>
+ Future.failed[PutResponse](getNestedException(t))
+ }
+
+ def put(key: String, value: Boolean): Future[PutResponse] = {
+ put(key, value.toString)
+ }
+
+ def put(key: String, value: Boolean, leaseId: Long): Future[PutResponse] = {
+ put(key, value.toString, leaseId)
+ }
+
+ override def del(key: String): Future[DeleteRangeResponse] =
+ client.getKvClient.delete(key).async().recoverWith {
+ case t =>
+ Future.failed[DeleteRangeResponse](getNestedException(t))
+ }
+
+ override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId:
Long): Future[TxnResponse] = {
+ client.getKvClient
+ .txnIf()
+ .cmpEqual(key)
+ .version(cmpVersion)
+ .`then`()
+ .put(client.getKvClient
+ .put(key, value.toString, leaseId)
+ .asRequest())
+ .async()
+ .recoverWith {
+ case t =>
+ Future.failed[TxnResponse](getNestedException(t))
+ }
+ }
+
+ @tailrec
+ private def getNestedException(t: Throwable): Throwable = {
+ if (t.getCause == null) t
+ else getNestedException(t.getCause)
+ }
+}
+
+trait KeyValueStore {
+
+ implicit val ece: ExecutionContextExecutor
+
+ def get(key: String): Future[RangeResponse]
+
+ def getPrefix(prefixKey: String): Future[RangeResponse]
+
+ def getCount(prefixKey: String): Future[Long]
+
+ def put(key: String, value: String): Future[PutResponse]
+
+ def put(key: String, value: String, leaseId: Long): Future[PutResponse]
+
+ def del(key: String): Future[DeleteRangeResponse]
+
+ def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: Long):
Future[TxnResponse]
+}
+
+trait EtcdLeaseApi {
+ import RichListenableFuture._
+ implicit val ece: ExecutionContextExecutor
+
+ protected[etcd] val client: Client
+ protected val DEFAULT_TTL = 2
+
+ def grant(ttl: Long = DEFAULT_TTL): Future[LeaseGrantResponse] = {
+ client.getLeaseClient.grant(ttl).async()
+ }
+
+ def revoke(leaseId: Long): Future[LeaseRevokeResponse] = {
+ client.getLeaseClient.revoke(leaseId)
+ }
+
+ def keepAliveOnce(leaseId: Long): Future[LeaseKeepAliveResponse] = {
+ client.getLeaseClient.keepAliveOnce(leaseId)
+ }
+}
+
+trait EtcdWatchApi {
+ val nThreads = loadConfigOrThrow[Int](ConfigKeys.etcdPoolThreads)
+ val threadpool = Executors.newFixedThreadPool(nThreads);
+ protected[etcd] val client: Client
+
+ def watchAllKeys(next: WatchUpdate => Unit = (_: WatchUpdate) => {},
+ error: Throwable => Unit = (_: Throwable) => {},
+ completed: () => Unit = () => {}): Watch = {
+ client.getKvClient
+ .watch(KvClient.ALL_KEYS)
+ .prevKv()
+ .executor(threadpool)
+ .start(new StreamObserver[WatchUpdate]() {
+ override def onNext(value: WatchUpdate): Unit = {
+ next(value)
+ }
+
+ override def onError(t: Throwable): Unit = {
+ error(t)
+ }
+
+ override def onCompleted(): Unit = {
+ completed()
+ }
+ })
+ }
+
+ def watch(key: String, isPrefix: Boolean = false)(next: WatchUpdate => Unit
= (_: WatchUpdate) => {},
+ error: Throwable => Unit =
(_: Throwable) => {},
+ completed: () => Unit = ()
=> {}): Watch = {
+ val watchRequest = if (isPrefix) {
+ client.getKvClient.watch(key).asPrefix().prevKv()
+ } else {
+ client.getKvClient.watch(key).prevKv()
+ }
+ watchRequest
+ .executor(threadpool)
+ .start(new StreamObserver[WatchUpdate]() {
+ override def onNext(value: WatchUpdate): Unit = {
+ next(value)
+ }
+
+ override def onError(t: Throwable): Unit = {
+ error(t)
+ }
+
+ override def onCompleted(): Unit = {
+ completed()
+ }
+ })
+ }
+
+}
+
+trait EtcdLeadershipApi extends EtcdKeyValueApi with EtcdLeaseApi with
EtcdWatchApi {
+
+ protected[etcd] val client: Client
+
+ val initVersion = 0
+
+ def electLeader(key: String, value: String, timeout: Long = 60):
Future[Either[EtcdFollower, EtcdLeader]] =
+ for {
+ leaseResp <- grant(timeout)
+ txnResp <- putTxn(key, value, initVersion, leaseResp.getID)
+ result <- Future {
+ if (txnResp.getSucceeded) {
+ Right(EtcdLeader(key, value, leaseResp.getID))
+ } else {
+ Left(EtcdFollower(key, value))
+ }
+ }
+ } yield result
+
+ def electLeader(key: String, value: String, lease: Lease):
Future[Either[EtcdFollower, EtcdLeader]] =
+ for {
+ txnResp <- putTxn(key, value, initVersion, lease.id)
+ result <- Future {
+ if (txnResp.getSucceeded) {
+ Right(EtcdLeader(key, value, lease.id))
+ } else {
+ Left(EtcdFollower(key, value))
+ }
+ }
+ } yield result
+
+ def keepAliveLeader(leaseId: Long): Future[Long] =
+ keepAliveOnce(leaseId).map(res => res.getID)
+
+}
+case class EtcdLeader(key: String, value: String, leaseId: Long)
+
+object EtcdLeader extends DefaultJsonProtocol {
+ implicit val serdes = jsonFormat3(EtcdLeader.apply)
+}
+
+case class EtcdFollower(key: String, value: String)
+
+object EtcdFollower extends DefaultJsonProtocol {
+ implicit val serdes = jsonFormat2(EtcdFollower.apply)
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdUtils.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdUtils.scala
new file mode 100644
index 0000000..3da9d6b
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdUtils.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.etcd
+
+import java.nio.charset.StandardCharsets
+
+import com.google.protobuf.ByteString
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.containerpool.ContainerId
+import org.apache.openwhisk.core.entity.SizeUnits.MB
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import pureconfig.loadConfigOrThrow
+
+import scala.language.implicitConversions
+import scala.util.Try
+
+case class EtcdConfig(hosts: String)
+
+case class EtcdException(msg: String) extends Exception(msg)
+
+/**
+ * If you import the line below, it implicitly converts ByteString type to
Scala Type.
+ *
+ * import org.apache.openwhisk.core.etcd.EtcdType._
+ */
+object EtcdType {
+
+ implicit def stringToByteString(str: String): ByteString =
ByteString.copyFromUtf8(str)
+
+ implicit def ByteStringToString(byteString: ByteString): String =
byteString.toString(StandardCharsets.UTF_8)
+
+ implicit def ByteStringToInt(byteString: ByteString): Int =
byteString.toString(StandardCharsets.UTF_8).toInt
+
+ implicit def IntToByteString(int: Int): ByteString =
ByteString.copyFromUtf8(int.toString)
+
+ implicit def ByteStringToLong(byteString: ByteString): Long =
byteString.toString(StandardCharsets.UTF_8).toLong
+
+ implicit def LongToByteString(long: Long): ByteString =
ByteString.copyFromUtf8(long.toString)
+
+ implicit def ByteStringToBoolean(byteString: ByteString): Boolean =
+ byteString.toString(StandardCharsets.UTF_8).toBoolean
+
+ implicit def BooleanToByteString(bool: Boolean): ByteString =
ByteString.copyFromUtf8(bool.toString)
+
+ implicit def ByteStringToByteSize(byteString: ByteString): ByteSize =
+ ByteSize(byteString.toString(StandardCharsets.UTF_8).toLong, MB)
+
+ implicit def ByteSizeToByteString(byteSize: ByteSize): ByteString =
ByteString.copyFromUtf8(byteSize.toMB.toString)
+
+}
+
+object EtcdKV {
+
+ val TOP = "\ufff0"
+
+ val clusterName = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+ object SchedulerKeys {
+ val prefix = s"$clusterName/scheduler"
+
+ val scheduler = s"$prefix"
+
+ /**
+ * The keys for states of schedulers
+ */
+ def scheduler(instanceId: SchedulerInstanceId) =
s"$prefix/${instanceId.asString}"
+
+ }
+
+ object QueueKeys {
+
+ val inProgressPrefix = s"$clusterName/in-progress"
+ val queuePrefix = s"$clusterName/queue"
+
+ /**
+ * The keys for in-progress queue
+ */
+ def inProgressQueue(invocationNamespace: String, fqn:
FullyQualifiedEntityName) =
+ s"$inProgressPrefix/queue/$invocationNamespace/${fqn.copy(version =
None)}"
+
+ /**
+ * The prefix key for state in the queue
+ */
+ def queuePrefix(invocationNamespace: String, fqn:
FullyQualifiedEntityName): String =
+ s"$queuePrefix/$invocationNamespace/${fqn.copy(version = None)}"
+
+ /**
+ * The keys for state in the queue
+ *
+ * Example
+ * - queue/invocationNs/ns/pkg/act/leader
+ * - queue/invocationNs/ns/pkg/act/follower/scheduler1
+ * - queue/invocationNs/ns/pkg/act/follower/scheduler2
+ *
+ */
+ def queue(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ leader: Boolean,
+ schedulerInstanceId: Option[SchedulerInstanceId] = None): String
= {
+ require(leader || (!leader && schedulerInstanceId.isDefined))
+ val prefix = s"$queuePrefix/$invocationNamespace/${fqn.copy(version =
None)}"
+ if (leader)
+ s"$prefix/leader"
+ else
+ s"$prefix/follower/${schedulerInstanceId.get.asString}"
+ }
+ }
+
+ object ThrottlingKeys {
+ val prefix = s"$clusterName/throttling"
+
+ /**
+ * The keys for namespace throttling
+ */
+ def namespace(namespace: EntityName) = s"$prefix/$namespace"
+
+ /**
+ * The keys for action throttling
+ */
+ def action(invocationNamespace: String, fqn: FullyQualifiedEntityName) =
+ s"$prefix/$invocationNamespace/${fqn.copy(version = None)}"
+
+ /**
+ * The keys for action throttling
+ */
+ def action(invocationNamespace: String, fqn: String) =
s"$prefix/$invocationNamespace/$fqn"
+
+ }
+
+ object ContainerKeys {
+ val namespacePrefix = s"$clusterName/namespace"
+ val inProgressPrefix = s"$clusterName/in-progress"
+ val warmedPrefix = s"$clusterName/warmed"
+
+ /**
+ * The keys for the number of container
+ */
+ def containerPrefix(containerType: String,
+ invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ revision: Option[DocRevision] = None): String =
+ s"$containerType/$invocationNamespace/${fqn.copy(version =
None)}/${revision.map(r => s"$r/").getOrElse("")}"
+
+ /**
+ * The keys for in-progress container
+ *
+ * For count queries, fqn must be at the front.
+ */
+ def inProgressContainer(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ revision: DocRevision,
+ sid: SchedulerInstanceId,
+ cid: CreationId): String =
+ s"${containerPrefix(inProgressPrefix, invocationNamespace, fqn,
Some(revision))}scheduler/${sid.asString}/creationId/$cid"
+
+ /**
+ * The keys for the number of warmed container
+ */
+ def warmedContainers(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ revision: DocRevision,
+ invokerInstanceId: InvokerInstanceId,
+ containerId: ContainerId): String =
+ s"${containerPrefix(warmedPrefix, invocationNamespace, fqn,
Some(revision))}invoker/${invokerInstanceId.instance}/container/${containerId.asString}"
+
+ /**
+ * The keys for the number of existing container
+ */
+ def existingContainers(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ revision: DocRevision,
+ invoker: Option[InvokerInstanceId] = None,
+ containerId: Option[ContainerId] = None): String =
+ containerPrefix(namespacePrefix, invocationNamespace, fqn,
Some(revision)) + invoker
+ .map(id => s"invoker${id.toInt}/")
+ .getOrElse("") + containerId
+ .map(id => s"container/${id.asString}")
+ .getOrElse("")
+
+ /**
+ * The keys for the number of in-progress container by namespace
+ */
+ def inProgressContainerPrefixByNamespace(invocationNamespace: String):
String =
+ s"$inProgressPrefix/$invocationNamespace/"
+
+ /**
+ * The keys for the number of existing container by namespace
+ */
+ def existingContainersPrefixByNamespace(invocationNamespace: String):
String =
+ s"$namespacePrefix/$invocationNamespace/"
+
+ }
+
+ object InvokerKeys {
+ val prefix = s"$clusterName/invokers"
+
+ /**
+ * If displayName only exists in the etcd key, we cannot differentiate it
with the uniqueName
+ */
+ def health(invokerInstanceId: InvokerInstanceId) = {
+ (invokerInstanceId.uniqueName, invokerInstanceId.displayedName) match {
+ case (Some(uniqueName), Some(displayName)) =>
s"$prefix/${invokerInstanceId.toInt}/$uniqueName/$displayName"
+ case (Some(uniqueName), None) =>
s"$prefix/${invokerInstanceId.toInt}/$uniqueName"
+ case _ =>
s"$prefix/${invokerInstanceId.toInt}"
+ }
+ }
+
+ // id is not supposed to be -1
+ private def getId(id: String): Int = {
+ Try { id.toInt } getOrElse (-1)
+ }
+
+ def getInstanceId(invokerKey: String): InvokerInstanceId = {
+ val constructs = invokerKey.split("\\b/+")
+ constructs match {
+ case Array(_, _, id, uniqueName, displayName) =>
+ InvokerInstanceId(getId(id), Some(uniqueName), Some(displayName),
0.B)
+ case Array(_, _, id, uniqueName) =>
+ InvokerInstanceId(getId(id), Some(uniqueName), userMemory = 0.B)
+ case Array(_, _, id) =>
+ InvokerInstanceId(getId(id), userMemory = 0.B)
+ }
+ }
+ }
+
+ object InstanceKeys {
+
+ val instancePrefix = s"$clusterName/instance"
+
+ def instanceLease(instanceId: InstanceId): String =
+ s"$instancePrefix/$instanceId/lease"
+ }
+
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
index 5424dd3..695ec17 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
@@ -99,6 +99,12 @@ object Messages {
s"${error.field} length is ${error.is.toBytes} but must be
${error.allowed.toBytes}."
}
+ /** Standard error for malformed creation id. */
+ val creationIdIllegal = "The creation id is not valid."
+ def creationIdLengthError(error: SizeError) = {
+ s"${error.field} length is ${error.is.toBytes} but must be
${error.allowed.toBytes}."
+ }
+
/** Error messages for sequence actions. */
val sequenceIsTooLong = "Too many actions in the sequence."
val sequenceNoComponent = "No component specified for the sequence."
diff --git a/tests/build.gradle b/tests/build.gradle
index 49fac12..f75800e 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -73,10 +73,11 @@ ext.testSets = [
"org/apache/openwhisk/core/cli/test/**",
"org/apache/openwhisk/core/limits/**",
"org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
+ "org/apache/openwhisk/common/etcd/**",
"**/*CacheConcurrencyTests*",
"**/*ControllerApiTests*",
"org/apache/openwhisk/testEntities/**",
- 'invokerShoot/**'
+ "invokerShoot/**"
]
],
"REQUIRE_SYSTEM" : [
@@ -197,6 +198,14 @@ task testUnit(type: Test) {
exclude couchDbExcludes
}
+task testUnitEtcd(type: Test) {
+ def etcdUnitIncludes = [
+ "org/apache/openwhisk/common/etcd/**"
+ ]
+
+ include etcdUnitIncludes
+}
+
dependencies {
compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile "org.apache.commons:commons-lang3:3.3.2"
diff --git a/tests/src/test/resources/application.conf.j2
b/tests/src/test/resources/application.conf.j2
index 90802bc..cdae2bd 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -118,6 +118,16 @@ whisk {
password = "{{ db.elasticsearch.auth.admin.password }}"
}
}
+
+ etcd {
+ hosts = "{{ etcd_connect_string }}"
+ lease {
+ timeout = "{{ etcd.lease.timeout }}"
+ }
+ pool {
+ threads = "{{ etcd.pool_threads }}"
+ }
+ }
}
#test-only overrides so that tests can override defaults in application.conf
(todo: move all defaults to reference.conf)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala
new file mode 100644
index 0000000..a93cda0
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdKvTests.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.etcd
+
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.InvokerInstanceId
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+
+@RunWith(classOf[JUnitRunner])
+class EtcdKvTests extends FlatSpec with ScalaFutures with Matchers {
+
+ behavior of "InvokerKeys"
+
+ val clusterName = loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+ val uniqueName = "myUniqueName"
+ val displayedName = "myDisplayedName"
+
+ it should "serialize a InvokerInstanceId to a health-key if there is only
id" in {
+ val instanceId = InvokerInstanceId(0, userMemory = 0.MB)
+ InvokerKeys.health(instanceId) shouldBe s"$clusterName/invokers/0"
+ }
+
+ it should "serialize a InvokerInstanceId to a health-key if there are id and
unique name" in {
+ val instanceId = InvokerInstanceId(0, Some(uniqueName), userMemory = 0.MB)
+ InvokerKeys.health(instanceId) shouldBe
s"$clusterName/invokers/0/$uniqueName"
+ }
+
+ it should "serialize a InvokerInstanceId to a health-key if there are id,
unique name and displayed name" in {
+ val instanceId = InvokerInstanceId(0, Some(uniqueName),
Some(displayedName), userMemory = 0.MB)
+ InvokerKeys.health(instanceId) shouldBe
s"$clusterName/invokers/0/$uniqueName/$displayedName"
+ }
+
+ it should "deserialize InvokerInstanceId from ETCD key if there is only id"
in {
+ val testKey = "$clusterName/invokers/0"
+ val instanceId = InvokerKeys.getInstanceId(testKey)
+
+ instanceId shouldBe InvokerInstanceId(0, userMemory = 0.MB)
+ }
+
+ it should "deserialize InvokerInstanceId from ETCD key with id and a unique
name" in {
+ val testKey = s"$clusterName/invokers/0/$uniqueName"
+ val instanceId = InvokerKeys.getInstanceId(testKey)
+
+ instanceId shouldBe InvokerInstanceId(0, Some(uniqueName), userMemory =
0.MB)
+ }
+
+ it should "deserialize InvokerInstanceId from ETCD key with id, a unique
name, and a displayed name" in {
+ val testKey = s"$clusterName/invokers/0/$uniqueName/$displayedName"
+ val instanceId = InvokerKeys.getInstanceId(testKey)
+
+ instanceId shouldBe InvokerInstanceId(0, Some(uniqueName),
Some(displayedName), userMemory = 0.MB)
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
new file mode 100644
index 0000000..816693a
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common.etcd
+
+import java.util.concurrent.Executor
+import java.{lang, util}
+
+import com.ibm.etcd.api.Event.EventType
+import com.ibm.etcd.api._
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.WatchUpdate
+import com.ibm.etcd.client.{EtcdClient => Client}
+import common.{StreamLogging, WskActorSystem}
+import io.grpc.{StatusRuntimeException, Status => GrpcStatus}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader,
EtcdLeadershipApi, Lease}
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+
+@RunWith(classOf[JUnitRunner])
+class EtcdLeaderShipUnitTests extends FlatSpec with ScalaFutures with Matchers
with WskActorSystem with StreamLogging {
+
+ implicit val timeout = Timeout(2.seconds)
+ private val leaderKey = "openwhiskleader"
+ private val endpoints = "endpoints"
+ private val leaseId = 60
+
+ class mockWatchUpdate extends WatchUpdate {
+ private var eventLists: util.List[Event] = new util.ArrayList[Event]()
+ override def getHeader: ResponseHeader = ???
+
+ def addEvents(event: Event): WatchUpdate = {
+ eventLists.add(event)
+ this
+ }
+
+ override def getEvents: util.List[Event] = eventLists
+ }
+
+ class MockEtcdLeadershipApi extends EtcdLeadershipApi {
+
+ override implicit val ece: ExecutionContextExecutor =
actorSystem.dispatcher
+
+ override val client: Client = {
+ val hostAndPorts = "172.17.0.1:2379"
+ Client.forEndpoints(hostAndPorts).withPlainText().build()
+ }
+
+ var onNext: WatchUpdate => Unit = null
+
+ override def grant(ttl: Long): Future[LeaseGrantResponse] =
+
Future.successful(LeaseGrantResponse.newBuilder().setID(leaseId).setTTL(ttl).build())
+
+ override def keepAliveOnce(leaseId: Long): Future[LeaseKeepAliveResponse] =
+
Future.successful(LeaseKeepAliveResponse.newBuilder().setID(leaseId).build())
+
+ override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId:
Long): Future[TxnResponse] =
+ Future.successful(TxnResponse.newBuilder().setSucceeded(true).build())
+
+ override def watch(key: String, isPrefix: Boolean)(next: WatchUpdate =>
Unit,
+ error: Throwable =>
Unit,
+ completed: () => Unit):
Watch = {
+ onNext = next
+ new Watch {
+ override def close(): Unit = {}
+
+ override def addListener(listener: Runnable, executor: Executor): Unit
= {}
+
+ override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+
+ override def isCancelled: Boolean = true
+
+ override def isDone: Boolean = true
+
+ override def get(): lang.Boolean = true
+
+ override def get(timeout: Long, unit: TimeUnit): lang.Boolean = true
+ }
+ }
+
+ def publishEvents(eventType: EventType, key: String, value: String): Unit
= {
+ val eType = eventType match {
+ case EventType.PUT => EventType.PUT
+ case EventType.DELETE => EventType.DELETE
+ case EventType.UNRECOGNIZED => EventType.UNRECOGNIZED
+ }
+
+ val event = Event
+ .newBuilder()
+ .setType(eType)
+ .setKv(
+ KeyValue
+ .newBuilder()
+ .setKey(key)
+ .setValue(value)
+ .build())
+ .build()
+ onNext(new mockWatchUpdate().addEvents(event))
+ }
+ }
+
+ behavior of "Etcd Leadership Client"
+
+ "Etcd LeaderShip client" should "elect leader successfully" in {
+ val mockLeaderShipClient = new MockEtcdLeadershipApi
+
+ val either = mockLeaderShipClient.electLeader(leaderKey,
endpoints).futureValue(timeout)
+ either.right.get shouldBe EtcdLeader(leaderKey, endpoints, leaseId)
+ }
+
+ "Etcd LeaderShip client" should "be failed to elect leader" in {
+ val mockLeaderShipClient = new MockEtcdLeadershipApi() {
+ override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId:
Long): Future[TxnResponse] =
+ Future.successful(TxnResponse.newBuilder().setSucceeded(false).build())
+ }
+
+ val either = mockLeaderShipClient.electLeader(leaderKey,
endpoints).futureValue(timeout)
+ either.left.get shouldBe EtcdFollower(leaderKey, endpoints)
+
+ }
+
+ "Etcd LeaderShip client" should "elect leader successfully with provided
lease" in {
+ val mockLeaderShipClient = new MockEtcdLeadershipApi
+
+ val either = mockLeaderShipClient.electLeader(leaderKey, endpoints,
Lease(leaseId, 60)).futureValue(timeout)
+ either.right.get shouldBe EtcdLeader(leaderKey, endpoints, leaseId)
+ }
+
+ "Etcd LeaderShip client" should "be failed to elect leader with provided
lease" in {
+ val mockLeaderShipClient = new MockEtcdLeadershipApi() {
+ override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId:
Long): Future[TxnResponse] =
+ Future.successful(TxnResponse.newBuilder().setSucceeded(false).build())
+ }
+
+ val either = mockLeaderShipClient.electLeader(leaderKey, endpoints,
Lease(leaseId, 60)).futureValue(timeout)
+ either.left.get shouldBe EtcdFollower(leaderKey, endpoints)
+ }
+
+ "Etcd LeaderShip client" should "throw StatusRuntimeException when provided
lease doesn't exist" in {
+ val mockLeaderShipClient = new MockEtcdLeadershipApi() {
+ override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId:
Long): Future[TxnResponse] =
+ Future.failed(new StatusRuntimeException(GrpcStatus.NOT_FOUND))
+ }
+
+ mockLeaderShipClient
+ .electLeader(leaderKey, endpoints, Lease(leaseId, 60))
+ .failed
+ .futureValue shouldBe a[StatusRuntimeException]
+ }
+
+ "Etcd LeaderShip client" should "keep alive leader key" in {
+ val mockLeaderShipClient = new MockEtcdLeadershipApi
+
+ mockLeaderShipClient.keepAliveLeader(leaseId).futureValue(timeout)
shouldBe leaseId
+ }
+
+}