This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e587bf8  Add couchdb clustering. (#2810)
e587bf8 is described below

commit e587bf8ec632320614f14114c4e85e67f39c06c2
Author: Dominic Kim <[email protected]>
AuthorDate: Wed Nov 29 21:49:36 2017 +0900

    Add couchdb clustering. (#2810)
    
    * Add couchdb clustering
---
 ansible/group_vars/all                             |   3 +-
 ansible/roles/couchdb/tasks/deploy.yml             |  56 +++++-
 ansible/templates/whisk.properties.j2              |   2 +
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   3 +-
 tests/src/test/scala/common/WhiskProperties.java   |  16 +-
 tests/src/test/scala/ha/ShootComponentsTests.scala | 198 +++++++++++++++++++--
 6 files changed, 251 insertions(+), 27 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index f1a746d..5b2ea44 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -213,6 +213,7 @@ nginx:
 # The key db.whisk.auth is the name of the authentication database where all 
keys of all users are stored.
 # The db_prefix is defined for each environment on its own. The CouchDb 
credentials are also defined for each environment on its own.
 db:
+  instances: "{{ groups['db'] | length }}"
   authkeys:
   - guest
   - whisk.system
@@ -239,7 +240,7 @@ linux:
   version: 4.4.0-31
 
 couchdb:
-  version: 2.0
+  version: 2.1
 
 docker:
   # The user to install docker for. Defaults to the ansible user if not set. 
This will be the user who is able to run
diff --git a/ansible/roles/couchdb/tasks/deploy.yml 
b/ansible/roles/couchdb/tasks/deploy.yml
index 27de148..88ec1eb 100644
--- a/ansible/roles/couchdb/tasks/deploy.yml
+++ b/ansible/roles/couchdb/tasks/deploy.yml
@@ -1,9 +1,9 @@
 ---
 # This role will run a CouchDB server on the db group
 
-- name: "Set node name to couchdb{{ groups['db'].index(inventory_hostname) }}"
+- name: "Set the coordinator to the first node"
   set_fact:
-    nodeName: "couchdb{{ groups['db'].index(inventory_hostname) }}"
+    coordinator: "{{ groups['db'][0] }}"
 
 - name: check if db credentials are valid for CouchDB
   fail: msg="The db provider in your {{ inventory_dir }}/group_vars/all is {{ 
db_provider }}, it has to be CouchDB, pls double check"
@@ -21,25 +21,27 @@
     volume_dir: "{{ instance.volume.fsmount | default( '/mnt/' + 
group_names|first, true ) }}:/usr/local/var/lib/couchdb"
   when: (block_device is defined) and (block_device in disk_status.stdout)
 
-- name: "pull the klaemo/couchdb:{{ couchdb.version }} image"
-  shell: "docker pull klaemo/couchdb:{{ couchdb.version }}"
+- name: "pull the apache/couchdb:{{ couchdb.version }} image"
+  shell: "docker pull apache/couchdb:{{ couchdb.version }}"
   retries: "{{ docker.pull.retries }}"
   delay: "{{ docker.pull.delay }}"
 
 - name: (re)start CouchDB
   docker_container:
     name: couchdb
-    image: klaemo/couchdb:{{ couchdb.version }}
+    image: apache/couchdb:{{ couchdb.version }}
     state: started
     recreate: true
     restart_policy: "{{ docker.restart.policy }}"
     volumes: "{{volume_dir | default([])}}"
     ports:
       - "{{ db_port }}:5984"
+      - "4369:4369"
+      - "9100:9100"
     env:
       COUCHDB_USER: "{{ db_username }}"
       COUCHDB_PASSWORD: "{{ db_password }}"
-      NODENAME: "{{ nodeName }}"
+      NODENAME: "{{ ansible_host }}"
 
 - name: wait until CouchDB in this host is up and running
   uri:
@@ -49,9 +51,48 @@
   retries: 12
   delay: 5
 
+- name: enable the cluster setup mode
+  uri:
+    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_cluster_setup"
+    method: POST
+    body: >
+        {"action": "enable_cluster", "bind_address":"0.0.0.0", "username": "{{ 
db_username }}", "password":"{{ db_password }}", "port": {{ db_port }}, 
"node_count": "{{ groups['db'] | length }}", "remote_node": "{{ ansible_host 
}}", "remote_current_user": "{{ db_username }}", "remote_current_password": "{{ 
db_password }}"}
+    body_format: json
+    status_code: 201
+    user: "{{ db_username }}"
+    password: "{{ db_password }}"
+    force_basic_auth: yes
+  when: inventory_hostname == coordinator
+
+- name: add remote nodes to the cluster
+  uri:
+    url: "{{ db_protocol }}://{{ coordinator }}:{{ db_port }}/_cluster_setup"
+    method: POST
+    body: >
+        {"action": "add_node", "host":"{{ ansible_host }}", "port": {{ db_port 
}}, "username": "{{ db_username }}", "password":"{{ db_password }}"}
+    body_format: json
+    status_code: 201
+    user: "{{ db_username }}"
+    password: "{{ db_password }}"
+    force_basic_auth: yes
+  when: inventory_hostname != coordinator
+
+- name: finish the cluster setup mode
+  uri:
+    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port }}/_cluster_setup"
+    method: POST
+    body: >
+        {"action": "finish_cluster"}
+    body_format: json
+    status_code: 201
+    user: "{{ db_username }}"
+    password: "{{ db_password }}"
+    force_basic_auth: yes
+  when: inventory_hostname == coordinator
+
 - name: disable reduce limit on views
   uri:
-    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port 
}}/_node/couchdb@{{ nodeName }}/_config/query_server_config/reduce_limit"
+    url: "{{ db_protocol }}://{{ ansible_host }}:{{ db_port 
}}/_node/couchdb@{{ ansible_host }}/_config/query_server_config/reduce_limit"
     method: PUT
     body: >
         "false"
@@ -60,3 +101,4 @@
     user: "{{ db_username }}"
     password: "{{ db_password }}"
     force_basic_auth: yes
+  when: inventory_hostname == coordinator
diff --git a/ansible/templates/whisk.properties.j2 
b/ansible/templates/whisk.properties.j2
index 8ae7b8c..925c246 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -92,6 +92,8 @@ db.whisk.activations={{ db.whisk.activations }}
 db.whisk.actions.ddoc={{ db.whisk.actions_ddoc }}
 db.whisk.activations.ddoc={{ db.whisk.activations_ddoc }}
 db.whisk.activations.filter.ddoc={{ db.whisk.activations_filter_ddoc }}
+db.hostsList={{ groups["db"] | map('extract', hostvars, 'ansible_host') | list 
| join(",") }}
+db.instances={{ db.instances }}
 
 apigw.auth.user={{apigw_auth_user}}
 apigw.auth.pwd={{apigw_auth_pwd}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index c08fcf1..36c17af 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -94,7 +94,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val dbAuths = this(WhiskConfig.dbAuths)
   val dbWhisk = this(WhiskConfig.dbWhisk)
   val dbActivations = this(WhiskConfig.dbActivations)
-
   val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
 
   val kafkaTopicsInvokerRetentionBytes = 
this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
@@ -222,6 +221,7 @@ object WhiskConfig {
 
   val controllerBlackboxFraction = "controller.blackboxFraction"
   val controllerInstances = "controller.instances"
+  val dbInstances = "db.instances"
 
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
@@ -233,6 +233,7 @@ object WhiskConfig {
   val redisHostPort = "redis.host.port"
 
   val invokerHostsList = "invoker.hosts"
+  val dbHostsList = "db.hostsList"
 
   val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
   val invokerHosts = Map(invokerHostsList -> null)
diff --git a/tests/src/test/scala/common/WhiskProperties.java 
b/tests/src/test/scala/common/WhiskProperties.java
index 0971b23..b770c1b 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -17,8 +17,6 @@
 
 package common;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -26,6 +24,8 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.util.Properties;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Properties that describe a whisk installation
  */
@@ -231,6 +231,14 @@ public class WhiskProperties {
         return whiskProperties.getProperty("controller.hosts");
     }
 
+    public static String getDBHosts() {
+        return whiskProperties.getProperty("db.hostsList");
+    }
+
+    public static int getDBPort() {
+        return Integer.parseInt(whiskProperties.getProperty("db.port"));
+    }
+
     public static int getControllerBasePort() {
         return 
Integer.parseInt(whiskProperties.getProperty("controller.host.basePort"));
     }
@@ -239,6 +247,10 @@ public class WhiskProperties {
         return getControllerHosts().split(",")[0];
     }
 
+    public static String getBaseDBHost() {
+        return getDBHosts().split(",")[0];
+    }
+
     public static String getBaseControllerAddress() {
         return getBaseControllerHost() + ":" + getControllerBasePort();
     }
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala 
b/tests/src/test/scala/ha/ShootComponentsTests.scala
index 5fd1525..c6e65dc 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -19,32 +19,35 @@ package ha
 import java.io.File
 import java.time.Instant
 
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.DurationInt
 import scala.util.Try
-
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
-
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.model.HttpRequest
 import akka.http.scaladsl.model.StatusCodes
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
-import common.TestUtils
-import common.WhiskProperties
+import common._
 import common.rest.WskRest
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
+import spray.json._
+import spray.json.DefaultJsonProtocol._
 import whisk.core.WhiskConfig
+import whisk.core.database.test.ExtendedCouchDbRestClient
 import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
-class ShootComponentsTests extends FlatSpec with Matchers with WskTestHelpers 
with ScalaFutures with WskActorSystem {
+class ShootComponentsTests
+    extends FlatSpec
+    with Matchers
+    with WskTestHelpers
+    with ScalaFutures
+    with WskActorSystem
+    with StreamLogging {
 
   implicit val wskprops = WskProps()
   val wsk = new WskRest
@@ -63,18 +66,49 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
   val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + 
WhiskProperties.getProperty(
     WhiskConfig.dockerPort)
 
-  def restartComponent(host: String, component: String) = {
+  val couchDB0DockerHost = WhiskProperties.getBaseDBHost() + ":" + 
WhiskProperties.getProperty(WhiskConfig.dockerPort)
+
+  val dbProtocol = WhiskProperties.getProperty(WhiskConfig.dbProtocol)
+  val dbHostsList = WhiskProperties.getDBHosts
+  val dbPort = WhiskProperties.getProperty(WhiskConfig.dbPort)
+  val dbUsername = WhiskProperties.getProperty(WhiskConfig.dbUsername)
+  val dbPassword = WhiskProperties.getProperty(WhiskConfig.dbPassword)
+  val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
+  val dbWhiskAuth = WhiskProperties.getProperty(WhiskConfig.dbAuths)
+
+  private def getDockerCommand(host: String, component: String, cmd: String) = 
{
     def file(path: String) = Try(new 
File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+
     val docker = (file("/usr/bin/docker") orElse 
file("/usr/local/bin/docker")).getOrElse("docker")
 
-    val cmd = Seq(docker, "--host", host, "restart", component)
+    Seq(docker, "--host", host, cmd, component)
+  }
+
+  def restartComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "restart")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def stopComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "stop")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def startComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "start")
     println(s"Running command: ${cmd.mkString(" ")}")
 
     TestUtils.runCmd(0, new File("."), cmd: _*)
   }
 
-  def ping(host: String, port: Int) = {
-    val response = Try { Http().singleRequest(HttpRequest(uri = 
s"http://$host:$port/ping";)).futureValue }.toOption
+  def ping(host: String, port: Int, path: String = "/") = {
+    val response = Try {
+      Http().singleRequest(HttpRequest(uri = 
s"http://$host:$port$path";)).futureValue
+    }.toOption
 
     response.map { res =>
       (res.status, Unmarshal(res).to[String].futureValue)
@@ -87,17 +121,34 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
     val host = 
WhiskProperties.getProperty("controller.hosts").split(",")(instance)
     val port = WhiskProperties.getControllerBasePort + instance
 
-    val res = ping(host, port)
+    val res = ping(host, port, "/ping")
     res == Some((StatusCodes.OK, "pong"))
   }
 
+  def isDBAlive(instance: Int): Boolean = {
+    require(instance >= 0 && instance < 2, "DB instance not known.")
+
+    val host = WhiskProperties.getProperty("db.hosts").split(",")(instance)
+    val port = WhiskProperties.getDBPort + instance
+
+    val res = ping(host, port)
+    res == Some(
+      (
+        StatusCodes.OK,
+        
"{\"couchdb\":\"Welcome\",\"version\":\"2.1.1\",\"features\":[\"scheduler\"],\"vendor\":{\"name\":\"The
 Apache Software Foundation\"}}\n"))
+  }
+
   def doRequests(amount: Int, actionName: String): Seq[(Int, Int)] = {
     (0 until amount).map { i =>
       val start = Instant.now
 
       // Do POSTs and GETs
-      val invokeExit = Future { wsk.action.invoke(actionName, expectedExitCode 
= TestUtils.DONTCARE_EXIT).exitCode }
-      val getExit = Future { wsk.action.get(actionName, expectedExitCode = 
TestUtils.DONTCARE_EXIT).exitCode }
+      val invokeExit = Future {
+        wsk.action.invoke(actionName, expectedExitCode = 
TestUtils.DONTCARE_EXIT).exitCode
+      }
+      val getExit = Future {
+        wsk.action.get(actionName, expectedExitCode = 
TestUtils.DONTCARE_EXIT).exitCode
+      }
 
       println(s"Done rerquests with responses: invoke: 
${invokeExit.futureValue} and get: ${getExit.futureValue}")
 
@@ -150,4 +201,119 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
       isControllerAlive(1) shouldBe true
     }
   }
+
+  behavior of "CouchDB HA"
+
+  it should "be able to retrieve documents from couchdb1 if couchdb0 goes 
down" in withAssetCleaner(wskprops) {
+    (wp, assetHelper) =>
+      if (WhiskProperties.getProperty(WhiskConfig.dbInstances).toInt >= 2) {
+
+        val dbName: String = dbWhiskAuth
+        val db1 = new ExtendedCouchDbRestClient(
+          dbProtocol,
+          dbHostsList.split(",")(0),
+          dbPort.toInt,
+          dbUsername,
+          dbPassword,
+          dbName)
+        val db2 = new ExtendedCouchDbRestClient(
+          dbProtocol,
+          dbHostsList.split(",")(1),
+          dbPort.toInt,
+          dbUsername,
+          dbPassword,
+          dbName)
+
+        println("Creating test document")
+        val docId = "couchdb-ha-test"
+        val testDocument = JsObject(
+          "_id" -> docId.toJson,
+          "namespaces" -> JsArray(
+            JsObject(
+              "name" -> docId.toJson,
+              "uuid" -> "789c46b1-71f6-4ed5-8c54-816aa4f8c502".toJson,
+              "key" -> 
"abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP".toJson)),
+          "subject" -> docId.toJson)
+
+        val docId2 = "couchdb-ha-test2"
+        val testDocument2 = JsObject(
+          "_id" -> docId2.toJson,
+          "namespaces" -> JsArray(
+            JsObject(
+              "name" -> docId2.toJson,
+              "uuid" -> "789c46b1-71f6-4ed5-8c54-816aa4f8c502".toJson,
+              "key" -> 
"abczO3xZCLrMN6v2BKK1dXYFpXlPkccOFqm12CdAsMgRU4VrNZ9lyGVCGuMDGIwP".toJson)),
+          "subject" -> docId2.toJson)
+
+        isDBAlive(0) shouldBe true
+
+        retry(db1.putDoc(docId, testDocument))
+
+        stopComponent(couchDB0DockerHost, "couchdb")
+
+        retry({
+          isDBAlive(0) shouldBe false
+        }, 100, Some(100.milliseconds))
+
+        retry({
+          val result = Await.result(db2.getDoc(docId), 15.seconds)
+          result should be('right)
+          result.right.get.getFields("_id") shouldBe 
testDocument.getFields("_id")
+        })
+
+        retry(
+          {
+            val result = Await.result(
+              db2.executeView("subjects", "identities")(startKey = 
List(docId), endKey = List(docId)),
+              15.seconds)
+            result should be('right)
+            result.right.get.getFields("_id") shouldBe 
testDocument.getFields("namespace")
+          },
+          100,
+          Some(100.milliseconds))
+
+        retry(db2.putDoc(docId2, testDocument2))
+
+        isDBAlive(0) shouldBe false
+
+        startComponent(couchDB0DockerHost, "couchdb")
+
+        retry({
+          isDBAlive(0) shouldBe true
+        }, 100, Some(100.milliseconds))
+
+        retry({
+          val result = Await.result(db1.getDoc(docId2), 15.seconds)
+          result should be('right)
+          result.right.get.getFields("_id") shouldBe 
testDocument2.getFields("_id")
+        })
+
+        retry(
+          {
+            val result = Await.result(
+              db1.executeView("subjects", "identities")(startKey = 
List(docId2), endKey = List(docId2)),
+              15.seconds)
+            result should be('right)
+            result.right.get.getFields("_id") shouldBe 
testDocument2.getFields("namespace")
+          },
+          100,
+          Some(100.milliseconds))
+
+        val doc1Result = Await.result(db1.getDoc(docId), 15.seconds)
+        val doc2Result = Await.result(db1.getDoc(docId2), 15.seconds)
+        val rev1 = 
doc1Result.right.get.fields.get("_rev").get.convertTo[String]
+        val rev2 = 
doc2Result.right.get.fields.get("_rev").get.convertTo[String]
+        Await.result(db1.deleteDoc(docId, rev1), 15.seconds)
+        Await.result(db1.deleteDoc(docId2, rev2), 15.seconds)
+
+        retry({
+          val result = Await.result(db1.getDoc(docId), 15.seconds)
+          result should be('left)
+        })
+        retry({
+          val result = Await.result(db1.getDoc(docId2), 15.seconds)
+          result should be('left)
+        })
+      }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to