This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e587bf8 Add couchdb clustering. (#2810)
e587bf8 is described below
commit e587bf8ec632320614f14114c4e85e67f39c06c2
Author: Dominic Kim <[email protected]>
AuthorDate: Wed Nov 29 21:49:36 2017 +0900
Add couchdb clustering. (#2810)
* Add couchdb clustering
---
ansible/group_vars/all | 3 +-
ansible/roles/couchdb/tasks/deploy.yml | 56 +++++-
ansible/templates/whisk.properties.j2 | 2 +
.../src/main/scala/whisk/core/WhiskConfig.scala | 3 +-
tests/src/test/scala/common/WhiskProperties.java | 16 +-
tests/src/test/scala/ha/ShootComponentsTests.scala | 198 +++++++++++++++++++--
6 files changed, 251 insertions(+), 27 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index f1a746d..5b2ea44 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -213,6 +213,7 @@ nginx:
# The key db.whisk.auth is the name of the authentication database where all
keys of all users are stored.
# The db_prefix is defined for each environment on its own. The CouchDb
credentials are also defined for each environment on its own.
db:
+ instances: "{{ groups['db'] | length }}"
authkeys:
- guest
- whisk.system
@@ -239,7 +240,7 @@ linux:
version: 4.4.0-31
couchdb:
- version: 2.0
+ version: 2.1
docker:
# The user to install docker for. Defaults to the ansible user if not set.
This will be the user who is able to run
diff --git a/ansible/roles/couchdb/tasks/deploy.yml
b/ansible/roles/couchdb/tasks/deploy.yml
index 27de148..88ec1eb 100644
--- a/ansible/roles/couchdb/tasks/deploy.yml
+++ b/ansible/roles/couchdb/tasks/deploy.yml
@@ -1,9 +1,9 @@
---
# This role will run a CouchDB server on the db group
-- name: "Set node name to couchdb{{ groups['db'].index(inventory_hostname) }}"
+- name: "Set the coordinator to the first node"
set_fact:
- nodeName: "couchdb{{ groups['db'].index(inventory_hostname) }}"
+ coordinator: "{{ groups['db'][0] }}"
- name: check if db credentials are valid for CouchDB
fail: msg="The db provider in your {{ inventory_dir }}/group_vars/all is {{
db_provider }}, it has to be CouchDB, pls double check"
@@ -21,25 +21,27 @@
volume_dir: "{{ instance.volume.fsmount | default( '/mnt/' +
group_names|first, true ) }}:/usr/local/var/lib/couchdb"
when: (block_device is defined) and (block_device in disk_status.stdout)
-- name: "pull the klaemo/couchdb:{{ couchdb.version }} image"
- shell: "docker pull klaemo/couchdb:{{ couchdb.version }}"
+- name: "pull the apache/couchdb:{{ couchdb.version }} image"
+ shell: "docker pull apache/couchdb:{{ couchdb.version }}"
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
- name: (re)start CouchDB
docker_container:
name: couchdb
- image: klaemo/couchdb:{{ couchdb.version }}
+ image: apache/couchdb:{{ couchdb.version }}
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
volumes: "{{volume_dir | default([])}}"
ports:
- "{{ db_port }}:5984"
+ - "4369:4369"
+ - "9100:9100"
env:
COUCHDB_USER: "{{ db_username }}"
COUCHDB_PASSWORD: "{{ db_password }}"
- NODENAME: "{{ nodeName }}"
+ NODENAME: "{{ ansible_host }}"
- name: wait until CouchDB in this host is up and running
uri:
@@ -49,9 +51,48 @@
retries: 12
delay: 5
+- name: enable the cluster setup mode
+ uri:
+ url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_cluster_setup"
+ method: POST
+ body: >
+ {"action": "enable_cluster", "bind_address":"0.0.0.0", "username": "{{
db_username }}", "password":"{{ db_password }}", "port": {{ db_port }},
"node_count": "{{ groups['db'] | length }}", "remote_node": "{{ ansible_host
}}", "remote_current_user": "{{ db_username }}", "remote_current_password": "{{
db_password }}"}
+ body_format: json
+ status_code: 201
+ user: "{{ db_username }}"
+ password: "{{ db_password }}"
+ force_basic_auth: yes
+ when: inventory_hostname == coordinator
+
+- name: add remote nodes to the cluster
+ uri:
+ url: "{{ db_protocol }}://{{ coordinator }}:{{ db_port }}/_cluster_setup"
+ method: POST
+ body: >
+ {"action": "add_node", "host":"{{ ansible_host }}", "port": {{ db_port
}}, "username": "{{ db_username }}", "password":"{{ db_password }}"}
+ body_format: json
+ status_code: 201
+ user: "{{ db_username }}"
+ password: "{{ db_password }}"
+ force_basic_auth: yes
+ when: inventory_hostname != coordinator
+
+- name: finish the cluster setup mode
+ uri:
+ url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_cluster_setup"
+ method: POST
+ body: >
+ {"action": "finish_cluster"}
+ body_format: json
+ status_code: 201
+ user: "{{ db_username }}"
+ password: "{{ db_password }}"
+ force_basic_auth: yes
+ when: inventory_hostname == coordinator
+
- name: disable reduce limit on views
uri:
- url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port
}}/_node/couchdb@{{ nodeName }}/_config/query_server_config/reduce_limit"
+ url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port
}}/_node/couchdb@{{ ansible_host }}/_config/query_server_config/reduce_limit"
method: PUT
body: >
"false"
@@ -60,3 +101,4 @@
user: "{{ db_username }}"
password: "{{ db_password }}"
force_basic_auth: yes
+ when: inventory_hostname == coordinator
diff --git a/ansible/templates/whisk.properties.j2
b/ansible/templates/whisk.properties.j2
index 8ae7b8c..925c246 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -92,6 +92,8 @@ db.whisk.activations={{ db.whisk.activations }}
db.whisk.actions.ddoc={{ db.whisk.actions_ddoc }}
db.whisk.activations.ddoc={{ db.whisk.activations_ddoc }}
db.whisk.activations.filter.ddoc={{ db.whisk.activations_filter_ddoc }}
+db.hostsList={{ groups["db"] | map('extract', hostvars, 'ansible_host') | list
| join(",") }}
+db.instances={{ db.instances }}
apigw.auth.user={{apigw_auth_user}}
apigw.auth.pwd={{apigw_auth_pwd}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index c08fcf1..36c17af 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -94,7 +94,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
val dbAuths = this(WhiskConfig.dbAuths)
val dbWhisk = this(WhiskConfig.dbWhisk)
val dbActivations = this(WhiskConfig.dbActivations)
-
val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
val kafkaTopicsInvokerRetentionBytes =
this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
@@ -222,6 +221,7 @@ object WhiskConfig {
val controllerBlackboxFraction = "controller.blackboxFraction"
val controllerInstances = "controller.instances"
+ val dbInstances = "db.instances"
val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
@@ -233,6 +233,7 @@ object WhiskConfig {
val redisHostPort = "redis.host.port"
val invokerHostsList = "invoker.hosts"
+ val dbHostsList = "db.hostsList"
val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
val invokerHosts = Map(invokerHostsList -> null)
diff --git a/tests/src/test/scala/common/WhiskProperties.java
b/tests/src/test/scala/common/WhiskProperties.java
index 0971b23..b770c1b 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -17,8 +17,6 @@
package common;
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -26,6 +24,8 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.util.Properties;
+import static org.junit.Assert.assertTrue;
+
/**
* Properties that describe a whisk installation
*/
@@ -231,6 +231,14 @@ public class WhiskProperties {
return whiskProperties.getProperty("controller.hosts");
}
+ public static String getDBHosts() {
+ return whiskProperties.getProperty("db.hostsList");
+ }
+
+ public static int getDBPort() {
+ return Integer.parseInt(whiskProperties.getProperty("db.port"));
+ }
+
public static int getControllerBasePort() {
return
Integer.parseInt(whiskProperties.getProperty("controller.host.basePort"));
}
@@ -239,6 +247,10 @@ public class WhiskProperties {
return getControllerHosts().split(",")[0];
}
+ public static String getBaseDBHost() {
+ return getDBHosts().split(",")[0];
+ }
+
public static String getBaseControllerAddress() {
return getBaseControllerHost() + ":" + getControllerBasePort();
}
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala
b/tests/src/test/scala/ha/ShootComponentsTests.scala
index 5fd1525..c6e65dc 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -19,32 +19,35 @@ package ha
import java.io.File
import java.time.Instant
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt
import scala.util.Try
-
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
-
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
-import common.TestUtils
-import common.WhiskProperties
+import common._
import common.rest.WskRest
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
+import spray.json._
+import spray.json.DefaultJsonProtocol._
import whisk.core.WhiskConfig
+import whisk.core.database.test.ExtendedCouchDbRestClient
import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
-class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers
with ScalaFutures with WskActorSystem {
+class ShootComponentsTests
+ extends FlatSpec
+ with Matchers
+ with WskTestHelpers
+ with ScalaFutures
+ with WskActorSystem
+ with StreamLogging {
implicit val wskprops = WskProps()
val wsk = new WskRest
@@ -63,18 +66,49 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" +
WhiskProperties.getProperty(
WhiskConfig.dockerPort)
- def restartComponent(host: String, component: String) = {
+ val couchDB0DockerHost = WhiskProperties.getBaseDBHost() + ":" +
WhiskProperties.getProperty(WhiskConfig.dockerPort)
+
+ val dbProtocol = WhiskProperties.getProperty(WhiskConfig.dbProtocol)
+ val dbHostsList = WhiskProperties.getDBHosts
+ val dbPort = WhiskProperties.getProperty(WhiskConfig.dbPort)
+ val dbUsername = WhiskProperties.getProperty(WhiskConfig.dbUsername)
+ val dbPassword = WhiskProperties.getProperty(WhiskConfig.dbPassword)
+ val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
+ val dbWhiskAuth = WhiskProperties.getProperty(WhiskConfig.dbAuths)
+
+ private def getDockerCommand(host: String, component: String, cmd: String) =
{
def file(path: String) = Try(new
File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+
val docker = (file("/usr/bin/docker") orElse
file("/usr/local/bin/docker")).getOrElse("docker")
- val cmd = Seq(docker, "--host", host, "restart", component)
+ Seq(docker, "--host", host, cmd, component)
+ }
+
+ def restartComponent(host: String, component: String) = {
+ val cmd: Seq[String] = getDockerCommand(host, component, "restart")
+ println(s"Running command: ${cmd.mkString(" ")}")
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+
+ def stopComponent(host: String, component: String) = {
+ val cmd: Seq[String] = getDockerCommand(host, component, "stop")
+ println(s"Running command: ${cmd.mkString(" ")}")
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+
+ def startComponent(host: String, component: String) = {
+ val cmd: Seq[String] = getDockerCommand(host, component, "start")
println(s"Running command: ${cmd.mkString(" ")}")
TestUtils.runCmd(0, new File("."), cmd: _*)
}
- def ping(host: String, port: Int) = {
- val response = Try { Http().singleRequest(HttpRequest(uri =
s"http://$host:$port/ping")).futureValue }.toOption
+ def ping(host: String, port: Int, path: String = "/") = {
+ val response = Try {
+ Http().singleRequest(HttpRequest(uri =
s"http://$host:$port$path")).futureValue
+ }.toOption
response.map { res =>
(res.status, Unmarshal(res).to[String].futureValue)
@@ -87,17 +121,34 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
val host =
WhiskProperties.getProperty("controller.hosts").split(",")(instance)
val port = WhiskProperties.getControllerBasePort + instance
- val res = ping(host, port)
+ val res = ping(host, port, "/ping")
res == Some((StatusCodes.OK, "pong"))
}
+ def isDBAlive(instance: Int): Boolean = {
+ require(instance >= 0 && instance < 2, "DB instance not known.")
+
+ val host = WhiskProperties.getProperty("db.hosts").split(",")(instance)
+ val port = WhiskProperties.getDBPort + instance
+
+ val res = ping(host, port)
+ res == Some(
+ (
+ StatusCodes.OK,
+
"{\"couchdb\":\"Welcome\",\"version\":\"2.1.1\",\"features\":[\"scheduler\"],\"vendor\":{\"name\":\"The
Apache Software Foundation\"}}\n"))
+ }
+
def doRequests(amount: Int, actionName: String): Seq[(Int, Int)] = {
(0 until amount).map { i =>
val start = Instant.now
// Do POSTs and GETs
- val invokeExit = Future { wsk.action.invoke(actionName, expectedExitCode
= TestUtils.DONTCARE_EXIT).exitCode }
- val getExit = Future { wsk.action.get(actionName, expectedExitCode =
TestUtils.DONTCARE_EXIT).exitCode }
+ val invokeExit = Future {
+ wsk.action.invoke(actionName, expectedExitCode =
TestUtils.DONTCARE_EXIT).exitCode
+ }
+ val getExit = Future {
+ wsk.action.get(actionName, expectedExitCode =
TestUtils.DONTCARE_EXIT).exitCode
+ }
println(s"Done rerquests with responses: invoke:
${invokeExit.futureValue} and get: ${getExit.futureValue}")
@@ -150,4 +201,119 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
isControllerAlive(1) shouldBe true
}
}
+
+ behavior of "CouchDB HA"
+
+ it should "be able to retrieve documents from couchdb1 if couchdb0 goes
down" in withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ if (WhiskProperties.getProperty(WhiskConfig.dbInstances).toInt >= 2) {
+
+ val dbName: String = dbWhiskAuth
+ val db1 = new ExtendedCouchDbRestClient(
+ dbProtocol,
+ dbHostsList.split(",")(0),
+ dbPort.toInt,
+ dbUsername,
+ dbPassword,
+ dbName)
+ val db2 = new ExtendedCouchDbRestClient(
+ dbProtocol,
+ dbHostsList.split(",")(1),
+ dbPort.toInt,
+ dbUsername,
+ dbPassword,
+ dbName)
+
+ println("Creating test document")
+ val docId = "couchdb-ha-test"
+ val testDocument = JsObject(
+ "_id" -> docId.toJson,
+ "namespaces" -> JsArray(
+ JsObject(
+ "name" -> docId.toJson,
+ "uuid" -> "789c46b1-71f6-4ed5-8c54-816aa4f8c502".toJson,
+ "key" ->
"abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP".toJson)),
+ "subject" -> docId.toJson)
+
+ val docId2 = "couchdb-ha-test2"
+ val testDocument2 = JsObject(
+ "_id" -> docId2.toJson,
+ "namespaces" -> JsArray(
+ JsObject(
+ "name" -> docId2.toJson,
+ "uuid" -> "789c46b1-71f6-4ed5-8c54-816aa4f8c502".toJson,
+ "key" ->
"abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP".toJson)),
+ "subject" -> docId2.toJson)
+
+ isDBAlive(0) shouldBe true
+
+ retry(db1.putDoc(docId, testDocument))
+
+ stopComponent(couchDB0DockerHost, "couchdb")
+
+ retry({
+ isDBAlive(0) shouldBe false
+ }, 100, Some(100.milliseconds))
+
+ retry({
+ val result = Await.result(db2.getDoc(docId), 15.seconds)
+ result should be('right)
+ result.right.get.getFields("_id") shouldBe
testDocument.getFields("_id")
+ })
+
+ retry(
+ {
+ val result = Await.result(
+ db2.executeView("subjects", "identities")(startKey =
List(docId), endKey = List(docId)),
+ 15.seconds)
+ result should be('right)
+ result.right.get.getFields("_id") shouldBe
testDocument.getFields("namespace")
+ },
+ 100,
+ Some(100.milliseconds))
+
+ retry(db2.putDoc(docId2, testDocument2))
+
+ isDBAlive(0) shouldBe false
+
+ startComponent(couchDB0DockerHost, "couchdb")
+
+ retry({
+ isDBAlive(0) shouldBe true
+ }, 100, Some(100.milliseconds))
+
+ retry({
+ val result = Await.result(db1.getDoc(docId2), 15.seconds)
+ result should be('right)
+ result.right.get.getFields("_id") shouldBe
testDocument2.getFields("_id")
+ })
+
+ retry(
+ {
+ val result = Await.result(
+ db1.executeView("subjects", "identities")(startKey =
List(docId2), endKey = List(docId2)),
+ 15.seconds)
+ result should be('right)
+ result.right.get.getFields("_id") shouldBe
testDocument2.getFields("namespace")
+ },
+ 100,
+ Some(100.milliseconds))
+
+ val doc1Result = Await.result(db1.getDoc(docId), 15.seconds)
+ val doc2Result = Await.result(db1.getDoc(docId2), 15.seconds)
+ val rev1 =
doc1Result.right.get.fields.get("_rev").get.convertTo[String]
+ val rev2 =
doc2Result.right.get.fields.get("_rev").get.convertTo[String]
+ Await.result(db1.deleteDoc(docId, rev1), 15.seconds)
+ Await.result(db1.deleteDoc(docId2, rev2), 15.seconds)
+
+ retry({
+ val result = Await.result(db1.getDoc(docId), 15.seconds)
+ result should be('left)
+ })
+ retry({
+ val result = Await.result(db1.getDoc(docId2), 15.seconds)
+ result should be('left)
+ })
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].